diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index 1f6811f58..745706522 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -365,6 +366,10 @@ struct JobInfo std::vector dynamicParseTreeVec; PrimitiveServerThreadPools primitiveServerThreadPools; + // Represents a `join edges` and `join id` to be restored in `join order` part. + std::map, uint32_t> joinEdgesToRestore; + // Represents a pair of `table` to be on a large side and weight associated with that table. + std::unordered_map tablesForLargeSide; private: // defaults okay diff --git a/dbcon/joblist/jlf_tuplejoblist.cpp b/dbcon/joblist/jlf_tuplejoblist.cpp index 33a1f0104..580be6dea 100644 --- a/dbcon/joblist/jlf_tuplejoblist.cpp +++ b/dbcon/joblist/jlf_tuplejoblist.cpp @@ -1591,36 +1591,80 @@ bool addFunctionJoin(vector& joinedTables, JobStepVector& joinSteps, s return added; } -void collectCycles(JoinGraph& joinGraph, const JobInfo& jobInfo, TableInfoMap& tableInfoMap, - uint32_t currentTable, uint32_t prevTable, JoinEdges& edgesToTransform, Cycles& cycles) +// This class represents a circular inner join graph transformer. +// It collects a cycles in the given join graph as disjoint set of the join edges, for each collected cycle +// removes the join edge (if column statistics available it tries to remove join edge which could produce +// highest intermediate join result, otherwise just a random edge in a cycle) and adds that join edge to +// jobinfo class be restored as `post join` equal fiter in a join ordering part. +class CircularJoinGraphTransformer { - // Mark as visited. - joinGraph[currentTable].fVisited = true; + public: + // Ctor. + CircularJoinGraphTransformer(TableInfoMap& infoMap, JobInfo& jobInfo, JobStepVector& joinSteps) + : infoMap(infoMap), jobInfo(jobInfo), joinSteps(joinSteps) + { + } + // Delete all other ctrs/dctrs. + CircularJoinGraphTransformer() = delete; + CircularJoinGraphTransformer(const CircularJoinGraphTransformer&) = delete; + CircularJoinGraphTransformer(CircularJoinGraphTransformer&&) = delete; + CircularJoinGraphTransformer& operator=(const CircularJoinGraphTransformer&) = delete; + CircularJoinGraphTransformer& operator=(CircularJoinGraphTransformer&&) = delete; + virtual ~CircularJoinGraphTransformer() = default; + + // Transform join graph. + void transformJoinGraph(); + + protected: + // Analyzes the `join graph` based on DFS algorithm. + virtual void analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable); + // For each cycle breaks it and collects join edges. + void breakCyclesAndCollectJoinEdges(); + // Removes given `join edge` from the `join graph`. + void breakCycleAndCollectJoinEdge(const std::pair& edgeForward); + // Initializes the `join graph` based on the table connections. + virtual void initializeJoinGraph(); + // Check if the given join edge has FK - FK relations. + bool isForeignKeyForeignKeyLink(const JoinEdge& edge, statistics::StatisticsManager* statisticsManager); + // Based on column statistics tries to search `join edge` with maximum join cardinality. + virtual void chooseEdgeToTransform(Cycle& cycle, std::pair& resultEdge); + // Removes given `tableId` from adjacent list. + void removeFromAdjacentList(uint32_t tableId, std::vector& adjList); + // Removes associated join step associated with the given `joinEdge` from job steps. + void removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge); + + // Join information. + TableInfoMap& infoMap; + JobInfo& jobInfo; + JobStepVector& joinSteps; + + // Represents a collection of cycles. + Cycles cycles; + // Represents internal `join graph.` + JoinGraph joinGraph; + // Represents a set of join edges to erase for each cycle in `join graph`. + JoinEdges edgesToTransform; + // Represents a table to start analysis. + uint32_t headTable{0}; +}; + +// Circular inner joins methods. +void CircularJoinGraphTransformer::analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable) +{ + // Mark as `GREY` to specify processing table node. + joinGraph[currentTable].fTableColor = JoinTableColor::GREY; joinGraph[currentTable].fParent = prevTable; // For each adjacent node. for (const auto adjNode : joinGraph[currentTable].fAdjacentList) { - // If visited and not a back edge consider as a cycle. - if (joinGraph[adjNode].fVisited && prevTable != adjNode) + if (prevTable != adjNode) { - Cycle cycle; - const auto edgeForward = make_pair(currentTable, adjNode); - const auto edgeBackward = make_pair(adjNode, currentTable); - - if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) + if (joinGraph[adjNode].fTableColor == JoinTableColor::GREY) { - edgesToTransform.insert(edgeForward); - cycle.push_back(edgeForward); - } - - auto nodeIt = currentTable; - auto nextNode = joinGraph[nodeIt].fParent; - // Walk back until we find node `adjNode` we identified before. - while (nextNode != UINT_MAX && nextNode != adjNode) - { - const auto edgeForward = make_pair(nextNode, nodeIt); - const auto edgeBackward = make_pair(nodeIt, nextNode); + Cycle cycle; + const auto edgeForward = make_pair(currentTable, adjNode); + const auto edgeBackward = make_pair(adjNode, currentTable); if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) { @@ -1628,69 +1672,80 @@ void collectCycles(JoinGraph& joinGraph, const JobInfo& jobInfo, TableInfoMap& t cycle.push_back(edgeForward); } - nodeIt = nextNode; - nextNode = joinGraph[nodeIt].fParent; - } - - // Add the last edge. - if (nextNode != UINT_MAX) - { - const auto edgeForward = make_pair(nextNode, nodeIt); - const auto edgeBackward = make_pair(nodeIt, nextNode); - - if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) + auto nodeIt = currentTable; + auto nextNode = joinGraph[nodeIt].fParent; + // Walk back until we find node `adjNode` we identified before. + while (nextNode != UINT_MAX && nextNode != adjNode) { - edgesToTransform.insert(edgeForward); - cycle.push_back(edgeForward); - } - } + const auto edgeForward = make_pair(nextNode, nodeIt); + const auto edgeBackward = make_pair(nodeIt, nextNode); - if (jobInfo.trace && cycle.size()) - { - std::cout << "Collected cycle (while walking join graph): " << std::endl; - for (const auto& edge : cycle) - { - cout << "Edge: " << edge.first << " -> " << edge.second << endl; - auto it = jobInfo.tableJoinMap.find(edge); - - cout << "Left keys: " << endl; - for (auto key : it->second.fLeftKeys) + if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) { - cout << "Key: " << key << " column oid: " << jobInfo.keyInfo->tupleKeyVec[key].fId << endl; + edgesToTransform.insert(edgeForward); + cycle.push_back(edgeForward); } - cout << "Right keys: " << endl; - for (auto key : it->second.fRightKeys) + nodeIt = nextNode; + nextNode = joinGraph[nodeIt].fParent; + } + + // Add the last edge. + if (nextNode != UINT_MAX) + { + const auto edgeForward = make_pair(nextNode, nodeIt); + const auto edgeBackward = make_pair(nodeIt, nextNode); + + if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) { - cout << "Key: " << key << " column oid: " << jobInfo.keyInfo->tupleKeyVec[key].fId << endl; + edgesToTransform.insert(edgeForward); + cycle.push_back(edgeForward); } } - } - // Collect the cycle. - if (cycle.size()) - cycles.push_back(std::move(cycle)); - } - // If not visited - go there. - else if (joinGraph[adjNode].fVisited == false) - { - collectCycles(joinGraph, jobInfo, tableInfoMap, adjNode, currentTable, edgesToTransform, cycles); + if (jobInfo.trace && cycle.size()) + { + std::cout << "Cycle found.\n"; + std::cout << "Collected cycle \n"; + for (const auto& edge : cycle) + std::cout << "Join edge: " << edge.first << " <-> " << edge.second << '\n'; + } + + // Collect the cycle. + if (cycle.size()) + cycles.push_back(std::move(cycle)); + } + // If not visited - go there. + else if (joinGraph[adjNode].fTableColor == JoinTableColor::WHITE) + { + analyzeJoinGraph(adjNode, currentTable); + } } } + + // Mark `BLACK` to specify this node is finished. + joinGraph[currentTable].fTableColor = JoinTableColor::BLACK; } -void removeFromList(uint32_t tableId, std::vector& adjList) +void CircularJoinGraphTransformer::removeFromAdjacentList(uint32_t tableId, std::vector& adjList) { auto tableIdIt = std::find(adjList.begin(), adjList.end(), tableId); if (tableIdIt != adjList.end()) adjList.erase(tableIdIt); } -bool isForeignKeyForeignKeyLink(TableInfoMap& infoMap, const JobInfo& jobInfo, - const pair& edge, - statistics::StatisticsManager* statisticsManager) +bool CircularJoinGraphTransformer::isForeignKeyForeignKeyLink( + const JoinEdge& edge, statistics::StatisticsManager* statisticsManager) { - const auto it = jobInfo.tableJoinMap.find(edge); + const auto end = jobInfo.tableJoinMap.end(); + auto it = jobInfo.tableJoinMap.find(edge); + if (it == end) + { + it = jobInfo.tableJoinMap.find(make_pair(edge.second, edge.first)); + if (it == end) + return false; + } + std::vector leftKeys, rightKeys; std::vector lOid, rOid; @@ -1757,66 +1812,138 @@ bool isForeignKeyForeignKeyLink(TableInfoMap& infoMap, const JobInfo& jobInfo, return false; } -void chooseEdgeToTransform(TableInfoMap& infoMap, const JobInfo& jobInfo, Cycle& cycle, - JoinEdges& edgesToTransform, std::pair& resultEdge) +void CircularJoinGraphTransformer::chooseEdgeToTransform(Cycle& cycle, + std::pair& resultEdge) { + // Use statistics if possible. auto* statisticsManager = statistics::StatisticsManager::instance(); - for (auto& edgeForward : cycle) { - if (isForeignKeyForeignKeyLink(infoMap, jobInfo, edgeForward, statisticsManager)) + // Check that `join edge` is aligned with our needs. + if (isForeignKeyForeignKeyLink(edgeForward, statisticsManager)) { const auto edgeBackward = std::make_pair(edgeForward.second, edgeForward.first); - if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) + if (!jobInfo.joinEdgesToRestore.count(edgeForward) && !jobInfo.joinEdgesToRestore.count(edgeBackward)) { - edgesToTransform.insert(edgeForward); - resultEdge = edgeForward; + resultEdge = std::make_pair(edgeForward, 0 /*Dummy weight*/); return; } } } if (jobInfo.trace) - std::cout << "FK FK key not found, removing the first one " << std::endl; + std::cout << "FK FK key not found, removing the first one inner join edge" << std::endl; - // FIXME: Use size of columns to possible largest cardinality. // Take just a first. - edgesToTransform.insert(cycle.front()); - resultEdge = cycle.front(); + resultEdge = std::make_pair(cycle.front(), 0 /*Dummy weight*/); } -void breakCyclesAndCollectEdges(TableInfoMap& infoMap, const JobInfo& jobInfo, Cycles& cycles, - JoinEdges& edgesToTransform) +void CircularJoinGraphTransformer::removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge) { - for (auto& cycle : cycles) + if (jobInfo.trace) { - std::pair edgeForward; - if (cycle.size() == 0) - continue; + std::cout << "Join steps before transformation: " << std::endl; + for (auto joinStepIt = joinSteps.begin(); joinStepIt < joinSteps.end(); joinStepIt++) - chooseEdgeToTransform(infoMap, jobInfo, cycle, edgesToTransform, edgeForward); - - if (jobInfo.trace) { - std::cout << "Remove " << edgeForward.first << " from adjlist of " << edgeForward.second << std::endl; + auto* tupleHashJoinStep = dynamic_cast(joinStepIt->get()); + if (tupleHashJoinStep) + { + std::cout << "Tables for hash join: " << getTableKey(jobInfo, tupleHashJoinStep->tupleId1()) + << " <-> " << getTableKey(jobInfo, tupleHashJoinStep->tupleId2()) << std::endl; + } } + } - // If not present add the edge. - auto tableInfoIt = jobInfo.tableJoinMap.find(edgeForward); + // Match the given `join edge` in `join steps` vector. + auto end = joinSteps.end(); + auto joinStepIt = joinSteps.begin(); + // We have to remove all `TupleHashJoinSteps` with the given table keys from join steps. + while (joinStepIt != end) + { + auto* tupleHashJoinStep = dynamic_cast(joinStepIt->get()); + if (tupleHashJoinStep) + { + const auto tableKey1 = getTableKey(jobInfo, tupleHashJoinStep->tupleId1()); + const auto tableKey2 = getTableKey(jobInfo, tupleHashJoinStep->tupleId2()); - auto& firstExp2 = infoMap[edgeForward.first].fColsInExp2; - firstExp2.insert(firstExp2.end(), tableInfoIt->second.fLeftKeys.begin(), - tableInfoIt->second.fLeftKeys.end()); - auto& secondExp2 = infoMap[edgeForward.second].fColsInExp2; - secondExp2.insert(secondExp2.end(), tableInfoIt->second.fRightKeys.begin(), - tableInfoIt->second.fRightKeys.end()); + if ((tableKey1 == joinEdge.first && tableKey2 == joinEdge.second) || + (tableKey1 == joinEdge.second && tableKey2 == joinEdge.first)) + { - removeFromList(edgeForward.first, infoMap[edgeForward.second].fAdjacentList); - removeFromList(edgeForward.second, infoMap[edgeForward.first].fAdjacentList); + if (jobInfo.trace) + std::cout << "Erase matched join step with keys: " << tableKey1 << " <-> " << tableKey2 + << std::endl; + + joinStepIt = joinSteps.erase(joinStepIt); + end = joinSteps.end(); + } + else + { + ++joinStepIt; + } + } + } + + if (jobInfo.trace) + { + std::cout << "Join steps after transformation: " << std::endl; + for (auto joinStepIt = joinSteps.begin(); joinStepIt < joinSteps.end(); joinStepIt++) + + { + auto* tupleHashJoinStep = dynamic_cast(joinStepIt->get()); + if (tupleHashJoinStep) + { + std::cout << "Tables for hash join: " << getTableKey(jobInfo, tupleHashJoinStep->tupleId1()) + << " <-> " << getTableKey(jobInfo, tupleHashJoinStep->tupleId2()) << std::endl; + } + } } } -void initJoinGraph(const TableInfoMap& infoMap, JoinGraph& joinGraph) +void CircularJoinGraphTransformer::breakCycleAndCollectJoinEdge( + const std::pair& joinEdgeWithWeight) +{ + // Add edge to be restored. + jobInfo.joinEdgesToRestore.insert({joinEdgeWithWeight.first, joinEdgeWithWeight.second}); + const auto edgeForward = joinEdgeWithWeight.first; + + // Keep key columns in result rowgroups, to avoid columns elimination at the intermediate joins. + auto tableInfoIt = jobInfo.tableJoinMap.find(edgeForward); + auto& firstExp2 = infoMap[edgeForward.first].fColsInExp2; + firstExp2.insert(firstExp2.end(), tableInfoIt->second.fLeftKeys.begin(), + tableInfoIt->second.fLeftKeys.end()); + auto& secondExp2 = infoMap[edgeForward.second].fColsInExp2; + secondExp2.insert(secondExp2.end(), tableInfoIt->second.fRightKeys.begin(), + tableInfoIt->second.fRightKeys.end()); + + // The edge is choosen on the previous step, we have to remove it from `adjacent list`. + removeFromAdjacentList(edgeForward.first, infoMap[edgeForward.second].fAdjacentList); + removeFromAdjacentList(edgeForward.second, infoMap[edgeForward.first].fAdjacentList); + + if (jobInfo.trace) + std::cout << "Remove from cycle join edge: " << edgeForward.first << " <-> " << edgeForward.second + << std::endl; + + // Remove all associated `TupleHashJoinSteps` from join steps. + removeAssociatedHashJoinStepFromJoinSteps(edgeForward); +} + +void CircularJoinGraphTransformer::breakCyclesAndCollectJoinEdges() +{ + if (jobInfo.trace) + std::cout << "Collected cycles size: " << cycles.size() << std::endl; + + // For each cycle. + for (auto& cycle : cycles) + { + std::pair joinEdgeWithWeight; + chooseEdgeToTransform(cycle, joinEdgeWithWeight); + breakCycleAndCollectJoinEdge(joinEdgeWithWeight); + } +} + +void CircularJoinGraphTransformer::initializeJoinGraph() { for (const auto& infoPair : infoMap) { @@ -1825,38 +1952,292 @@ void initJoinGraph(const TableInfoMap& infoMap, JoinGraph& joinGraph) joinTableNode.fAdjacentList = infoPair.second.fAdjacentList; joinGraph[infoPair.first] = joinTableNode; } + + // For inner join we can choose any table to be a head. + headTable = joinGraph.begin()->first; } -void collectEdgesAndBreakCycles(TableInfoMap& infoMap, const JobInfo& jobInfo, JoinEdges& edgesToTransform) +void CircularJoinGraphTransformer::transformJoinGraph() { - JoinGraph joinGraph; - initJoinGraph(infoMap, joinGraph); - Cycles cycles; + initializeJoinGraph(); + analyzeJoinGraph(/*currentTable=*/headTable, /*prevTable=*/UINT_MAX); + edgesToTransform.clear(); + breakCyclesAndCollectJoinEdges(); +} - collectCycles(joinGraph, jobInfo, infoMap, - /*currentTable=*/joinGraph.begin()->first, - /*prevTable=*/UINT_MAX, edgesToTransform, cycles); - - if (jobInfo.trace) +// This class represents circular outer join graph transformer. +// It defines a weight for the particular edge in join graph from the prioriy of the joins defined by +// the user. +// In general lets a assume we have a cycle t1 -> t2 -> ... ti ... tn -> t1 (where n is natural number >= 3), +// the weighted graph will have 2 edges with maximum weights among other - (tn -> t1) and (tn - 1 -> tn) +// those are candidates for transformation. +// Adds a table with associated `join edge` to be on a large side. +class CircularOuterJoinGraphTransformer : public CircularJoinGraphTransformer +{ + public: + // Ctor. + CircularOuterJoinGraphTransformer(TableInfoMap& infoMap, JobInfo& jobInfo, JobStepVector& joinSteps) + : CircularJoinGraphTransformer(infoMap, jobInfo, joinSteps) { - std::cout << "Collected cycles (after walking join graph): " << std::endl; - for (const auto& cycle : cycles) + } + // Delete all other ctrs/dcts. + CircularOuterJoinGraphTransformer() = delete; + CircularOuterJoinGraphTransformer(const CircularOuterJoinGraphTransformer&) = delete; + CircularOuterJoinGraphTransformer(CircularOuterJoinGraphTransformer&&) = delete; + CircularOuterJoinGraphTransformer& operator=(const CircularOuterJoinGraphTransformer&) = delete; + CircularOuterJoinGraphTransformer& operator=(CircularOuterJoinGraphTransformer&&) = delete; + ~CircularOuterJoinGraphTransformer() override = default; + + private: + // Analyzes the given `join graph`. + void analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable) override; + // Chooses a join edge to transform from the given cycle based on the join edge weight, + // the join edge for transformation has a maximum weight in a cycle. + void chooseEdgeToTransform(Cycle& cycle, std::pair& resultEdge) override; + // Returns the min weight among all join weights related to the given `headTable`. + uint32_t getSublingsMinWeight(uint32_t headTable, uint32_t associatedTable); + // Returns the max weight which is less than given `upperBoundWeight` among all join weights related to + // the given `headTable`. + uint32_t getSublingsMaxWeightLessThan(uint32_t headTable, uint32_t associatedTable, + uint32_t upperBoundWeight); + // Initializes `join graph` from the table connections. + void initializeJoinGraph() override; + + // The map which represents a weight for each join edge in join graph. + std::map joinEdgesToWeights; +}; + +uint32_t CircularOuterJoinGraphTransformer::getSublingsMinWeight(uint32_t headTable, uint32_t associatedTable) +{ + uint32_t minWeight = UINT_MAX; + for (const auto adjNode : joinGraph[headTable].fAdjacentList) + { + if (adjNode != associatedTable) { - std::cout << "Collected cycle: " << std::endl; - for (const auto& edge : cycle) + JoinEdge joinEdge(adjNode, headTable); + minWeight = std::min(joinEdgesToWeights[joinEdge], minWeight); + } + } + return minWeight; +} + +uint32_t CircularOuterJoinGraphTransformer::getSublingsMaxWeightLessThan(uint32_t headTable, + uint32_t associatedTable, + uint32_t upperBoundWeight) +{ + uint32_t maxWeight = 0; + for (const auto adjNode : joinGraph[headTable].fAdjacentList) + { + if (adjNode != associatedTable) + { + JoinEdge joinEdge(adjNode, headTable); + const auto currentWeight = joinEdgesToWeights[joinEdge]; + if (currentWeight < upperBoundWeight) + maxWeight = std::max(currentWeight, maxWeight); + } + } + return maxWeight; +} + +void CircularOuterJoinGraphTransformer::initializeJoinGraph() +{ + // Initialize a join graph at first. + CircularJoinGraphTransformer::initializeJoinGraph(); + + // Associate join weights. + if (jobInfo.trace) + std::cout << "Join edges with weights.\n"; + + uint32_t minWeightFullGraph = UINT_MAX; + JoinEdge joinEdgeWithMinWeight(0, 0); + + // For each join step we associate a `join id` with `join edge`. + for (auto joinStepIt = joinSteps.begin(); joinStepIt < joinSteps.end(); joinStepIt++) + { + auto* tupleHashJoinStep = dynamic_cast(joinStepIt->get()); + if (tupleHashJoinStep) + { + const uint32_t weight = tupleHashJoinStep->joinId(); + const auto tableKey1 = getTableKey(jobInfo, tupleHashJoinStep->tupleId1()); + const auto tableKey2 = getTableKey(jobInfo, tupleHashJoinStep->tupleId2()); + + // Edge forward. + JoinEdge edgeForward{tableKey1, tableKey2}; + auto joinEdgeWeightIt = joinEdgesToWeights.find(edgeForward); + if (joinEdgeWeightIt == joinEdgesToWeights.end()) + joinEdgesToWeights.insert({edgeForward, weight}); + else + joinEdgeWeightIt->second = std::max(weight, joinEdgeWeightIt->second); + + // Edge backward. + JoinEdge edgeBackward{tableKey2, tableKey1}; + joinEdgeWeightIt = joinEdgesToWeights.find(edgeBackward); + if (joinEdgeWeightIt == joinEdgesToWeights.end()) + joinEdgesToWeights.insert({edgeBackward, weight}); + else + joinEdgeWeightIt->second = std::max(weight, joinEdgeWeightIt->second); + + if (minWeightFullGraph > weight) { - std::cout << edge.first << " -> " << edge.second << std::endl; + minWeightFullGraph = weight; + joinEdgeWithMinWeight = edgeForward; } + + if (jobInfo.trace) + std::cout << edgeForward.first << " <-> " << edgeForward.second << " : " << weight << std::endl; } } - edgesToTransform.clear(); - // Finally break the cycles by removing collected edges from the graph. - breakCyclesAndCollectEdges(infoMap, jobInfo, cycles, edgesToTransform); + if (jobInfo.trace) + std::cout << "Minimum weight edge is: " << joinEdgeWithMinWeight.first << " <-> " + << joinEdgeWithMinWeight.second << std::endl; + + // Search for `head table` by the given join edge, we have 2 candidates. + // The head table is opposite to the table which has a join edge with minimum weight among all edges related + // to that table. + if (getSublingsMinWeight(joinEdgeWithMinWeight.first, joinEdgeWithMinWeight.second) > + getSublingsMinWeight(joinEdgeWithMinWeight.second, joinEdgeWithMinWeight.first)) + headTable = joinEdgeWithMinWeight.first; + else + headTable = joinEdgeWithMinWeight.second; + + if (jobInfo.trace) + std::cout << "Head table is: " << headTable << std::endl; } -void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobInfo& jobInfo, - JoinEdges& edgesToTransform) +void CircularOuterJoinGraphTransformer::analyzeJoinGraph(uint32_t currentTable, uint32_t prevTable) +{ + joinGraph[currentTable].fTableColor = JoinTableColor::GREY; + joinGraph[currentTable].fParent = prevTable; + + std::vector> adjacentListWeighted; + // For each adjacent node. + for (const auto adjNode : joinGraph[currentTable].fAdjacentList) + { + if (prevTable != adjNode) + { + const JoinEdge joinEdge{currentTable, adjNode}; + const auto weight = joinEdgesToWeights[joinEdge]; + adjacentListWeighted.push_back({adjNode, weight}); + } + } + + // Sort vertices by weights. + std::sort(adjacentListWeighted.begin(), adjacentListWeighted.end(), + [](const std::pair& a, const std::pair& b) { + return a.second < b.second; + }); + + // For each weighted adjacent node. + for (const auto& adjNodeWeighted : adjacentListWeighted) + { + const auto adjNode = adjNodeWeighted.first; + // If visited and not a back edge consider as a cycle. + if (joinGraph[adjNode].fTableColor == JoinTableColor::GREY) + { + Cycle cycle; + const auto edgeForward = make_pair(currentTable, adjNode); + const auto edgeBackward = make_pair(adjNode, currentTable); + + if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) + { + edgesToTransform.insert(edgeForward); + cycle.push_back(edgeForward); + } + + auto nodeIt = currentTable; + auto nextNode = joinGraph[nodeIt].fParent; + // Walk back until we find node `adjNode` we identified before. + while (nextNode != UINT_MAX && nextNode != adjNode) + { + const auto edgeForward = make_pair(nextNode, nodeIt); + const auto edgeBackward = make_pair(nodeIt, nextNode); + + if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) + { + edgesToTransform.insert(edgeForward); + cycle.push_back(edgeForward); + } + + nodeIt = nextNode; + nextNode = joinGraph[nodeIt].fParent; + } + + // Add the last edge. + if (nextNode != UINT_MAX) + { + const auto edgeForward = make_pair(nextNode, nodeIt); + const auto edgeBackward = make_pair(nodeIt, nextNode); + + if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward)) + { + edgesToTransform.insert(edgeForward); + cycle.push_back(edgeForward); + } + } + + // Collect the cycle. + if (cycle.size()) + cycles.push_back(std::move(cycle)); + } + else if (joinGraph[adjNode].fTableColor == JoinTableColor::WHITE) + { + analyzeJoinGraph(adjNode, currentTable); + } + } + + joinGraph[currentTable].fTableColor = JoinTableColor::BLACK; +} + +void CircularOuterJoinGraphTransformer::chooseEdgeToTransform(Cycle& cycle, + std::pair& resultEdge) +{ + uint32_t maxWeightInCycle = 0; + JoinEdge joinEdgeWithMaxWeight; + + if (jobInfo.trace) + std::cout << "Collected cycle:\n"; + + // Search for a join edge with max weight in a given cycle. + for (const auto& edgeForward : cycle) + { + if (jobInfo.trace) + std::cout << "Join edge: " << edgeForward.first << " <-> " << edgeForward.second + << " with weight: " << joinEdgesToWeights[edgeForward] << "\n"; + + if (joinEdgesToWeights[edgeForward] > maxWeightInCycle) + { + maxWeightInCycle = joinEdgesToWeights[edgeForward]; + joinEdgeWithMaxWeight = edgeForward; + } + } + + if (jobInfo.trace) + std::cout << "Join edge with max weight in a cycle: " << joinEdgeWithMaxWeight.first << " <-> " + << joinEdgeWithMaxWeight.second << " weight: " << maxWeightInCycle << "\n"; + + // Search for `large side table`. The `large side table` is table related to the maximum join edge in a + // cycle, it has a maximum weight among all join edges related to that table and less than maximum join edge + // in a cycle. + uint32_t largeSideTable = joinEdgeWithMaxWeight.first; + if (getSublingsMaxWeightLessThan(joinEdgeWithMaxWeight.second, joinEdgeWithMaxWeight.first, + maxWeightInCycle) > + getSublingsMaxWeightLessThan(joinEdgeWithMaxWeight.first, joinEdgeWithMaxWeight.second, + maxWeightInCycle)) + largeSideTable = joinEdgeWithMaxWeight.second; + + // Add large table to the map for the `join ordering` part. + if (!jobInfo.tablesForLargeSide.count(largeSideTable)) + jobInfo.tablesForLargeSide.insert({largeSideTable, maxWeightInCycle}); + + if (jobInfo.trace) + std::cout << "Large side table: " << largeSideTable << std::endl; + + // Assign a result edge. + resultEdge = std::make_pair(joinEdgeWithMaxWeight, maxWeightInCycle); +} + +void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector& joinSteps, JobInfo& jobInfo) { bool spanningTree = true; unsigned errcode = 0; @@ -2109,17 +2490,13 @@ void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobI // 2. Cycles. if (spanningTree && (nodeSet.size() - pathSet.size() / 2 != 1)) { - // 2.1. Inner. + std::unique_ptr joinGraphTransformer; if (jobInfo.outerOnTable.size() == 0) - { - collectEdgesAndBreakCycles(tableInfoMap, jobInfo, edgesToTransform); - } - // 2.2. Outer. + joinGraphTransformer.reset(new CircularJoinGraphTransformer(tableInfoMap, jobInfo, joinSteps)); else - { - errcode = ERR_CIRCULAR_JOIN; - spanningTree = false; - } + joinGraphTransformer.reset(new CircularOuterJoinGraphTransformer(tableInfoMap, jobInfo, joinSteps)); + + joinGraphTransformer->transformJoinGraph(); } } @@ -2364,8 +2741,154 @@ string joinTypeToString(const JoinType& joinType) return ret; } -void matchEdgesInRowGroup(const JobInfo& jobInfo, const RowGroup& rg, JoinEdges& edgesToTransform, - PostJoinFilterKeys& postJoinFilterKeys) +bool matchKeys(const vector& keysToSearch, const vector& keysToMatch) +{ + std::unordered_set keysMap; + for (const auto key : keysToSearch) + keysMap.insert(key); + + for (const auto key : keysToMatch) + { + if (!keysMap.count(key)) + return false; + } + + return true; +} + +void tryToRestoreJoinEdges(JobInfo& jobInfo, JoinInfo* joinInfo, const RowGroup& largeSideRG, + std::vector& smallKeyIndices, std::vector& largeKeyIndices, + std::vector& traces, std::map& joinIndexIdMap, + uint32_t smallSideIndex) +{ + if (!jobInfo.joinEdgesToRestore.size()) + return; + + const RowGroup& smallSideRG = joinInfo->fRowGroup; + ostringstream oss; + if (jobInfo.trace) + oss << "\n\nTry to match edges for the small and large sides rowgroups\n"; + + std::vector smallKeyIndicesToRestore; + std::vector largeKeyIndicesToRestore; + std::vector> takenEdgesWithJoinIDs; + auto& joinEdgesToRestore = jobInfo.joinEdgesToRestore; + + // We could have a multple join edges to restore from the same vertex e.g: + // t1 -> t2 -> t3 + // ^ ^ | + // \ | V + // t5 <- t4 + // Edges to restore: {t5, t2}, {t5, t1} + // Large side row group: {t5} + // Small side row group: {t1, t2, t3, t4} + // Large side join keys: {t5, t5} + // Small side join keys: {t2, t1} + for (const auto& [edge, joinId] : joinEdgesToRestore) + { + auto it = jobInfo.tableJoinMap.find(edge); + // Edge keys. + const auto& leftKeys = it->second.fLeftKeys; + const auto& rightKeys = it->second.fRightKeys; + // Keys for the given rowgroups. + // Large side and small side. + const auto& smallSideKeys = smallSideRG.getKeys(); + const auto& largeSideKeys = largeSideRG.getKeys(); + + // Check if left in large and right in small. + if (matchKeys(largeSideKeys, leftKeys) && matchKeys(smallSideKeys, rightKeys)) + { + for (uint32_t i = 0, e = leftKeys.size(); i < e; ++i) + largeKeyIndicesToRestore.push_back(getKeyIndex(leftKeys[i], largeSideRG)); + + for (uint32_t i = 0, e = rightKeys.size(); i < e; ++i) + smallKeyIndicesToRestore.push_back(getKeyIndex(rightKeys[i], smallSideRG)); + + if (jobInfo.trace) + { + oss << "Keys matched.\n"; + oss << "Left keys:\n"; + for (const auto key : leftKeys) + oss << key << " "; + oss << "\nRight keys:\n"; + for (const auto key : rightKeys) + oss << key << " "; + oss << '\n'; + } + + takenEdgesWithJoinIDs.push_back({edge, joinId}); + continue; + } + + // Otherwise check right in large and left in small. + if (matchKeys(largeSideKeys, rightKeys) && matchKeys(smallSideKeys, leftKeys)) + { + for (uint32_t i = 0, e = rightKeys.size(); i < e; ++i) + largeKeyIndicesToRestore.push_back(getKeyIndex(rightKeys[i], largeSideRG)); + + for (uint32_t i = 0, e = leftKeys.size(); i < e; ++i) + smallKeyIndicesToRestore.push_back(getKeyIndex(leftKeys[i], smallSideRG)); + + if (jobInfo.trace) + { + oss << "Keys matched.\n"; + oss << "Left keys:\n"; + for (const auto key : leftKeys) + oss << key << " "; + oss << "\nRight keys:\n"; + for (const auto key : rightKeys) + oss << key << " "; + oss << '\n'; + } + + takenEdgesWithJoinIDs.push_back({edge, joinId}); + } + } + + // Check if keys were not matched. + if (!smallKeyIndicesToRestore.size()) + { + if (jobInfo.trace) + oss << "Keys not matched.\n\n"; + + traces.push_back(oss.str()); + return; + } + + // Add keys. + smallKeyIndices.insert(smallKeyIndices.end(), smallKeyIndicesToRestore.begin(), + smallKeyIndicesToRestore.end()); + largeKeyIndices.insert(largeKeyIndices.end(), largeKeyIndicesToRestore.begin(), + largeKeyIndicesToRestore.end()); + // Mark as tupeless for multiple keys join. + joinInfo->fJoinData.fTypeless = true; + + // Associate a join id and small side index for the on clause filters and remove taken edges. + for (const auto& [edge, joinId] : takenEdgesWithJoinIDs) + { + joinIndexIdMap[joinId] = smallSideIndex; + auto it = joinEdgesToRestore.find(edge); + joinEdgesToRestore.erase(it); + } + + if (jobInfo.trace) + { + oss << "Keys restored.\n"; + oss << "Small side keys:\n"; + for (const auto key : smallKeyIndices) + oss << key << " "; + oss << "\nLarge side keys:\n"; + for (const auto key : largeKeyIndices) + oss << key << " "; + oss << "\n\n"; + } + + traces.push_back(oss.str()); +} + +void matchEdgesInResultRowGroup(const JobInfo& jobInfo, const RowGroup& rg, + std::map& edgesToRestore, + PostJoinFilterKeys& postJoinFilterKeys) { if (jobInfo.trace) { @@ -2373,11 +2896,12 @@ void matchEdgesInRowGroup(const JobInfo& jobInfo, const RowGroup& rg, JoinEdges& "filter\n"; } - std::vector> takenEdges; - for (const auto& edge : edgesToTransform) + std::vector takenEdges; + for (const auto& [edge, joinId] : edgesToRestore) { - auto it = jobInfo.tableJoinMap.find(edge); std::vector currentKeys; + auto it = jobInfo.tableJoinMap.find(edge); + // Combine keys. currentKeys = it->second.fLeftKeys; currentKeys.insert(currentKeys.end(), it->second.fRightKeys.begin(), it->second.fRightKeys.end()); @@ -2424,8 +2948,8 @@ void matchEdgesInRowGroup(const JobInfo& jobInfo, const RowGroup& rg, JoinEdges& // Erase taken edges. for (const auto& edge : takenEdges) { - auto it = edgesToTransform.find(edge); - edgesToTransform.erase(it); + auto it = edgesToRestore.find(edge); + edgesToRestore.erase(it); } } @@ -2479,7 +3003,6 @@ void createPostJoinFilters(const JobInfo& jobInfo, TableInfoMap& tableInfoMap, // Create columns. auto* leftColumn = new SimpleColumn(leftTableColName.schema, leftTableColName.table, leftTableColName.column); - auto* rightColumn = new SimpleColumn(rightTableColName.schema, rightTableColName.table, rightTableColName.column); @@ -2508,13 +3031,10 @@ void createPostJoinFilters(const JobInfo& jobInfo, TableInfoMap& tableInfoMap, // Create an eq operator. SOP eqPredicateOperator(new PredicateOperator("=")); - // Set a type. eqPredicateOperator->setOpType(leftColumn->resultType(), rightColumn->resultType()); - // Create a post join filter. SimpleFilter* joinFilter = new SimpleFilter(eqPredicateOperator, leftColumn, rightColumn); - postJoinFilters.push_back(joinFilter); // Erase keys from fColsInExp2. @@ -2532,10 +3052,24 @@ void createPostJoinFilters(const JobInfo& jobInfo, TableInfoMap& tableInfoMap, ++rightKeyIndex; } } + + if (jobInfo.trace) + { + if (postJoinFilters.size()) + { + cout << "Post join filters created." << endl; + for (auto* filter : postJoinFilters) + cout << filter->toString() << endl; + } + else + { + std::cout << "Post join filters were not created." << std::endl; + } + } } SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo& jobInfo, - vector& joinOrder, JoinEdges& edgesToTransform) + vector& joinOrder, std::map& joinEdgesToRestore) { vector smallSides; tableInfoMap[large].fVisited = true; @@ -2553,7 +3087,7 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo if (tableInfoMap[*i].fVisited == false) { cId = *i; - smallSides.push_back(joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder, edgesToTransform)); + smallSides.push_back(joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder, joinEdgesToRestore)); tableSet.insert(tableInfoMap[*i].fJoinedTables.begin(), tableInfoMap[*i].fJoinedTables.end()); } @@ -2742,7 +3276,6 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo dl->OID(large); outJsa.outAdd(spdl); thjs->outputAssociation(outJsa); - thjs->configSmallSideRG(smallSideRGs, tableNames); thjs->configLargeSideRG(tableInfoMap[large].fRowGroup); thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices); @@ -2825,12 +3358,13 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo readyExpSteps.insert(readyExpSteps.end(), jobInfo.outerJoinExpressions.begin(), jobInfo.outerJoinExpressions.end()); + // Check if we have a `join edges` to restore as post join filter for result rowgroup. PostJoinFilterKeys postJoinFilterKeys; - if (edgesToTransform.size()) - matchEdgesInRowGroup(jobInfo, rg, edgesToTransform, postJoinFilterKeys); + if (joinEdgesToRestore.size()) + matchEdgesInResultRowGroup(jobInfo, rg, joinEdgesToRestore, postJoinFilterKeys); // check additional compares for semi-join. - if (readyExpSteps.size() > 0 || postJoinFilterKeys.size() > 0) + if (readyExpSteps.size() || postJoinFilterKeys.size()) { // tables have additional comparisons map correlateTables; // index in thjs @@ -2856,7 +3390,7 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo } } - if (readyExpSteps.size() > 0 && correlateTables.size() > 0) + if (readyExpSteps.size() && correlateTables.size()) { // separate additional compare for each table pair JobStepVector::iterator eit = readyExpSteps.begin(); @@ -2932,22 +3466,15 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo thjs->setJoinFilterInputRG(rg); } - // normal expression if any. - if (readyExpSteps.size() > 0 || postJoinFilterKeys.size() > 0) + // Normal expression or post join filters. + if (readyExpSteps.size() || postJoinFilterKeys.size()) { - // add the expression steps in where clause can be solved by this join to bps - ParseTree* pt = NULL; - std::vector postJoinFilters; - createPostJoinFilters(jobInfo, tableInfoMap, postJoinFilterKeys, keyToIndexMap, postJoinFilters); - - if (jobInfo.trace) - { - cout << "Filters created " << endl; - for (auto* filter : postJoinFilters) - cout << filter->toString() << endl; - } + if (postJoinFilterKeys.size()) + createPostJoinFilters(jobInfo, tableInfoMap, postJoinFilterKeys, keyToIndexMap, postJoinFilters); + // Add the expression steps in where clause can be solved by this join to bps. + ParseTree* pt = NULL; for (auto* joinFilter : postJoinFilters) { if (pt == nullptr) @@ -2965,7 +3492,6 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo } JobStepVector::iterator eit = readyExpSteps.begin(); - for (; eit != readyExpSteps.end(); eit++) { // map the input column index @@ -3108,7 +3634,7 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& JobInfo& jobInfo, vector& joinOrder) { // populate the tableInfo for join - map joinInfoMap; // + std::map joinInfoMap; // // > // large priority: @@ -3117,7 +3643,6 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& // 1 - can be on either large or small side; // 2 - must be on large side. map> joinStepMap; - BatchPrimitive* bps = NULL; SubAdapterStep* tsas = NULL; TupleHashJoinStep* thjs = NULL; @@ -3141,14 +3666,52 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& thjs = dynamic_cast(spjs.get()); TupleBPS* tbps = dynamic_cast(spjs.get()); - if (*i == largest) + /* We have to specify for `TupleHasJoinStep` associated with key to be restored, to be on a large side, + to avoid situation with multiple small sides, when CS tries to merge join steps. + + 1. Consider the following join graph: + t2 + / \ + t1 -- t3 + + {t3, t1} has a max weight, we choose that edge to remove from join graph. + + 2. Join graph after {t3, t1} edge was removed: + t2 + / \ + t1 t3 + + a. It's legal for CS to schedule this joins as one step join as follow: + join(small sides {t1, t3}, large side {t2}) + + b. Instead of generating two steps join as follow: + t1_t2 = join(small sides {t1}, large side {t2}) + join(small sides {t1_t2}, large side {t3)} + + In case of `a` we unable to implement a join with a multiple keys. + */ + + if (jobInfo.tablesForLargeSide.count(*i)) + { + const auto tableWeight = jobInfo.tablesForLargeSide[*i]; + joinStepMap[*i] = make_pair(spjs, tableWeight); + } + else if (*i == largest) + { joinStepMap[*i] = make_pair(spjs, 2); + } else if (tbps || thjs) + { joinStepMap[*i] = make_pair(spjs, 1); + } else if (tsas) + { joinStepMap[*i] = make_pair(spjs, 0); + } else + { joinStepMap[*i] = make_pair(spjs, -1); + } } // sort the join steps based on join ID. @@ -3200,6 +3763,10 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& updateJoinSides(small, large, joinInfoMap, smallSides, tableInfoMap, jobInfo); + // This is a table for multiple join edges, always a stream table. + if (joinStepMap[large].second > 2) + umstream = true; + if (find(joinedTable.begin(), joinedTable.end(), small) == joinedTable.end()) joinedTable.push_back(small); @@ -3333,13 +3900,15 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& } } - for (vector::iterator i = smallSides.begin(); i != smallSides.end(); i++) + uint32_t smallSideIndex = 0; + std::map joinIdIndexMap; + + for (vector::iterator i = smallSides.begin(); i != smallSides.end(); i++, smallSideIndex++) { JoinInfo* info = i->get(); smallSideDLs.push_back(info->fDl); smallSideRGs.push_back(info->fRowGroup); jointypes.push_back(info->fJoinData.fTypes[0]); - typeless.push_back(info->fJoinData.fTypeless); vector smallIndices; vector largeIndices; @@ -3356,6 +3925,10 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& largeIndices.push_back(getKeyIndex(*k2, largeSideRG)); } + // Try to restore `circular join edge` if possible. + tryToRestoreJoinEdges(jobInfo, info, largeSideRG, smallIndices, largeIndices, traces, joinIdIndexMap, + smallSideIndex); + typeless.push_back(info->fJoinData.fTypeless); smallKeyIndices.push_back(smallIndices); largeKeyIndices.push_back(largeIndices); @@ -3437,7 +4010,6 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& dl->OID(large); outJsa.outAdd(spdl); thjs->outputAssociation(outJsa); - thjs->configSmallSideRG(smallSideRGs, tableNames); thjs->configLargeSideRG(joinInfoMap[large]->fRowGroup); thjs->configJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices); @@ -3456,7 +4028,6 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& inJsa.outAdd(smallSideDLs, startPos); thjs->inputAssociation(inJsa); thjs->setLargeSideDLIndex(inJsa.outSize() - 1); - thjs->addSmallSideRG(smallSideRGs, tableNames); thjs->addJoinKeyIndex(jointypes, typeless, smallKeyIndices, largeKeyIndices); } @@ -3478,9 +4049,7 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& cout << "RowGroup join result: " << endl << rg.toString() << endl << endl; } - // on clause filter association - map joinIdIndexMap; - + // The map for in clause filter. for (size_t i = 0; i < smallSides.size(); i++) { if (smallSides[i]->fJoinData.fJoinId > 0) @@ -3520,7 +4089,7 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& // for on clause condition, need check join ID if (ready && exp->associatedJoinId() != 0) { - map::iterator x = joinIdIndexMap.find(exp->associatedJoinId()); + auto x = joinIdIndexMap.find(exp->associatedJoinId()); ready = (x != joinIdIndexMap.end()); } @@ -3654,7 +4223,7 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& e->updateInputIndex(keyToIndexMap, jobInfo); // short circuit on clause expressions - map::iterator x = joinIdIndexMap.find(e->associatedJoinId()); + auto x = joinIdIndexMap.find(e->associatedJoinId()); if (x != joinIdIndexMap.end()) { @@ -3761,13 +4330,12 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap& } inline void joinTables(JobStepVector& joinSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo, - vector& joinOrder, JoinEdges& edgesToTransform, - const bool overrideLargeSideEstimate) + vector& joinOrder, const bool overrideLargeSideEstimate) { uint32_t largestTable = getLargestTable(jobInfo, tableInfoMap, overrideLargeSideEstimate); if (jobInfo.outerOnTable.size() == 0) - joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder, edgesToTransform); + joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder, jobInfo.joinEdgesToRestore); else joinTablesInOrder(largestTable, joinSteps, tableInfoMap, jobInfo, joinOrder); } @@ -4397,9 +4965,8 @@ void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSte projectSteps.clear(); deliverySteps.clear(); - JoinEdges edgesToTransform; // Check if the tables and joins can be used to construct a spanning tree. - spanningTreeCheck(tableInfoMap, joinSteps, jobInfo, edgesToTransform); + spanningTreeCheck(tableInfoMap, joinSteps, jobInfo); // 1. combine job steps for each table TableInfoMap::iterator mit; @@ -4410,7 +4977,7 @@ void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSte // 2. join the combined steps together to form the spanning tree vector joinOrder; - joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, edgesToTransform, overrideLargeSideEstimate); + joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, overrideLargeSideEstimate); // 3. put the steps together for (vector::iterator i = joinOrder.begin(); i != joinOrder.end(); ++i) diff --git a/dbcon/joblist/jlf_tuplejoblist.h b/dbcon/joblist/jlf_tuplejoblist.h index 879260a4e..6de096934 100644 --- a/dbcon/joblist/jlf_tuplejoblist.h +++ b/dbcon/joblist/jlf_tuplejoblist.h @@ -128,19 +128,31 @@ void orExpresssion(const execplan::Operator* op, JobInfo& jobInfo); // union the queries and return the tuple union step SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo& jobInfo); +// Used for join graph analysis. +// WHITE - node is not processed. +// GREY - node is in process. +// BLACK - node is done. +enum class JoinTableColor +{ + WHITE, + GREY, + BLACK +}; + struct JoinTableNode { - bool fVisited; + JoinTableColor fTableColor; uint32_t fParent; std::vector fAdjacentList; - JoinTableNode() : fVisited(false), fParent(-1) + JoinTableNode() : fTableColor(JoinTableColor::WHITE), fParent(UINT_MAX) { } }; using JoinGraph = std::map; -using JoinEdges = std::set>; -using Cycles = std::vector>>; -using Cycle = std::vector>; -using PostJoinFilterKeys = std::vector, std::vector>>; +using JoinEdge = std::pair; +using JoinEdges = std::set; +using Cycle = std::vector; +using Cycles = std::vector>; +using PostJoinFilterKeys = std::vector>>; } // namespace joblist diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index b333db630..df716524b 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -1092,7 +1092,8 @@ const string TupleHashJoinStep::toString() const { ostringstream oss; size_t idlsz = fInputJobStepAssociation.outSize(); - idbassert(idlsz > 1); + // Avoid assertion on empty `TupleHashJoinStep`. + idbassert(idlsz > 1 || idlsz == 0); oss << "TupleHashJoinStep ses:" << fSessionId << " st:" << fStepId; oss << omitOidInDL; @@ -1184,6 +1185,7 @@ void TupleHashJoinStep::configJoinKeyIndex(const vector& jt, const vec { joinTypes.insert(joinTypes.begin(), jt.begin(), jt.end()); typelessJoin.insert(typelessJoin.begin(), typeless.begin(), typeless.end()); + smallSideKeys.insert(smallSideKeys.begin(), smallkey.begin(), smallkey.end()); largeSideKeys.insert(largeSideKeys.begin(), largekey.begin(), largekey.end()); #ifdef JLF_DEBUG diff --git a/mysql-test/columnstore/basic/r/mcol-4699.result b/mysql-test/columnstore/basic/r/mcol-4699.result new file mode 100644 index 000000000..7ead83468 --- /dev/null +++ b/mysql-test/columnstore/basic/r/mcol-4699.result @@ -0,0 +1,133 @@ +DROP DATABASE IF EXISTS mcol4699; +CREATE DATABASE mcol4699; +USE mcol4699; +create table t1 (a int, b int) engine=columnstore; +create table t2 (a int, b int) engine=columnstore; +create table t3 (a int, b int) engine=columnstore; +create table t4 (a int, b int) engine=columnstore; +create table t5 (a int, b int) engine=columnstore; +create table t6 (a int, b int) engine=columnstore; +create table t7 (a int, b int) engine=columnstore; +create table t8 (a int, b int) engine=columnstore; +create table t9 (a int, b int) engine=columnstore; +insert into t1 values (1, 3), (2, 3), (3, 4); +insert into t2 values (1, 2), (2, 4), (4, 5); +insert into t3 values (1, 2), (2, 3), (3, 4), (4, 5); +insert into t4 values (1, 3); +insert into t5 values (1, 2), (3, 4); +insert into t6 values (1, 2), (3, 4); +insert into t7 values (1, 3); +insert into t8 values (1, 3); +insert into t9 values (1, 2); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b); +a b a b a b +1 3 1 2 1 2 +NULL NULL NULL NULL 2 3 +NULL NULL NULL NULL 3 4 +NULL NULL NULL NULL 4 5 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a); +a b a b a b a b +1 3 1 2 1 2 1 3 +NULL NULL NULL NULL 2 3 NULL NULL +NULL NULL NULL NULL 3 4 NULL NULL +NULL NULL NULL NULL 4 5 NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) where (t3.a = 1 or t3.a = 3); +a b a b a b +1 3 1 2 1 2 +NULL NULL NULL NULL 3 4 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) where (t3.a > 1); +a b a b a b +NULL NULL NULL NULL 2 3 +NULL NULL NULL NULL 3 4 +NULL NULL NULL NULL 4 5 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t2.a = t3.a) left join t4 on (t3.a = t4.a and t1.b = t4.b) order by t3.a; +a b a b a b a b +1 3 1 2 1 2 1 3 +2 3 2 4 2 3 NULL NULL +NULL NULL NULL NULL 3 4 NULL NULL +NULL NULL NULL NULL 4 5 NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t2.a = t3.a) left join t4 on (t3.a = t4.a and t2.b = t4.b) order by t3.a; +a b a b a b a b +1 3 1 2 1 2 NULL NULL +2 3 2 4 2 3 NULL NULL +NULL NULL NULL NULL 3 4 NULL NULL +NULL NULL NULL NULL 4 5 NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b and t3.a > t1.a); +a b a b a b +NULL NULL NULL NULL 1 2 +NULL NULL NULL NULL 2 3 +NULL NULL NULL NULL 3 4 +NULL NULL NULL NULL 4 5 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b and t3.a > t2.a and t3.a > t1.a); +a b a b a b +NULL NULL NULL NULL 1 2 +NULL NULL NULL NULL 2 3 +NULL NULL NULL NULL 3 4 +NULL NULL NULL NULL 4 5 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b and t3.b > t2.a and t3.a < t1.b); +a b a b a b +1 3 1 2 1 2 +NULL NULL NULL NULL 2 3 +NULL NULL NULL NULL 3 4 +NULL NULL NULL NULL 4 5 +select * from t1 left join t2 on (t1.a = t2.a) left join t3 on (t2.a = t3.a) left join t4 on (t4.a = t3.a) left join t5 on (t5.a = t2.a) left join t6 on (t5.a = t6.a and t6.a = t4.a); +a b a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 1 2 +2 3 2 4 2 3 NULL NULL NULL NULL NULL NULL +3 4 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) order by t3.a; +a b a b a b a b +1 3 1 2 1 2 1 3 +NULL NULL NULL NULL 2 3 NULL NULL +NULL NULL NULL NULL 3 4 NULL NULL +NULL NULL NULL NULL 4 5 NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) right join t4 on (t3.a = t4.a and t2.a = t4.a) order by t3.a; +a b a b a b a b +1 3 1 2 1 2 1 3 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a) left join t4 on (t3.a = t4.a and t1.a = t4.a and t2.a = t4.a); +a b a b a b a b +1 3 1 2 1 2 1 3 +2 3 2 4 2 3 NULL NULL +NULL NULL NULL NULL 3 4 NULL NULL +NULL NULL NULL NULL 4 5 NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a) right join t4 on (t3.a = t4.a and t1.a = t4.a and t2.a = t4.a); +a b a b a b a b +1 3 1 2 1 2 1 3 +select * from t1 left join t2 on (t1.a = t2.a) left join t3 on (t2.a = t3.a) left join t4 on (t4.a = t3.a) left join t5 on (t5.a = t2.a) left join t6 on (t5.a = t6.a and t6.a = t4.a) left join t7 on (t7.a = t3.a) left join t8 on (t8.a = t7.a and t8.a = t2.a); +a b a b a b a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 1 2 1 3 1 3 +2 3 2 4 2 3 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +3 4 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) left join t5 on (t4.a = t5.a and t3.b = t5.b); +a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 +NULL NULL NULL NULL 2 3 NULL NULL NULL NULL +NULL NULL NULL NULL 3 4 NULL NULL NULL NULL +NULL NULL NULL NULL 4 5 NULL NULL NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) right join t5 on (t4.a = t5.a and t3.b = t5.b); +a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 +NULL NULL NULL NULL NULL NULL NULL NULL 3 4 +select * from t1 left join t2 on (t1.a = t2.a) left join t3 on (t2.a = t3.a) left join t4 on (t4.a = t3.a) left join t5 on (t5.a = t2.a) left join t6 on (t5.a = t6.a and t6.a = t4.a) left join t7 on (t7.a = t3.a) left join t8 on (t8.a = t7.a and t8.a = t2.a) left join t9 on (t7.a = t9.a and t4.a = t9.a); +a b a b a b a b a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 1 2 1 3 1 3 1 2 +2 3 2 4 2 3 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +3 4 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) right join t5 on (t4.a = t5.a and t3.b = t5.b) right join t6 on (t5.a = t6.a and t4.a = t6.a); +a b a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 1 2 +NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL 3 4 +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) right join t5 on (t4.a = t5.a and t3.b = t5.b) left join t6 on (t5.a = t6.a and t4.a = t6.a); +a b a b a b a b a b a b +1 3 1 2 1 2 1 3 1 2 1 2 +NULL NULL NULL NULL NULL NULL NULL NULL 3 4 NULL NULL +drop table t1; +drop table t2; +drop table t3; +drop table t4; +drop table t5; +drop table t6; +drop table t7; +drop table t8; +drop table t9; +DROP DATABASE mcol4699; diff --git a/mysql-test/columnstore/basic/t/mcol-4699.test b/mysql-test/columnstore/basic/t/mcol-4699.test new file mode 100644 index 000000000..251ed5a9e --- /dev/null +++ b/mysql-test/columnstore/basic/t/mcol-4699.test @@ -0,0 +1,70 @@ + +#-- source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol4699; +--enable_warnings + +CREATE DATABASE mcol4699; + +USE mcol4699; + +create table t1 (a int, b int) engine=columnstore; +create table t2 (a int, b int) engine=columnstore; +create table t3 (a int, b int) engine=columnstore; +create table t4 (a int, b int) engine=columnstore; +create table t5 (a int, b int) engine=columnstore; +create table t6 (a int, b int) engine=columnstore; +create table t7 (a int, b int) engine=columnstore; +create table t8 (a int, b int) engine=columnstore; +create table t9 (a int, b int) engine=columnstore; + +insert into t1 values (1, 3), (2, 3), (3, 4); +insert into t2 values (1, 2), (2, 4), (4, 5); +insert into t3 values (1, 2), (2, 3), (3, 4), (4, 5); +insert into t4 values (1, 3); +insert into t5 values (1, 2), (3, 4); +insert into t6 values (1, 2), (3, 4); +insert into t7 values (1, 3); +insert into t8 values (1, 3); +insert into t9 values (1, 2); + +# 1 cycle. +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) where (t3.a = 1 or t3.a = 3); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) where (t3.a > 1); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t2.a = t3.a) left join t4 on (t3.a = t4.a and t1.b = t4.b) order by t3.a; +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t2.a = t3.a) left join t4 on (t3.a = t4.a and t2.b = t4.b) order by t3.a; +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b and t3.a > t1.a); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b and t3.a > t2.a and t3.a > t1.a); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b and t3.b > t2.a and t3.a < t1.b); +select * from t1 left join t2 on (t1.a = t2.a) left join t3 on (t2.a = t3.a) left join t4 on (t4.a = t3.a) left join t5 on (t5.a = t2.a) left join t6 on (t5.a = t6.a and t6.a = t4.a); + +# 2 cycles. +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) order by t3.a; +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) right join t4 on (t3.a = t4.a and t2.a = t4.a) order by t3.a; +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a) left join t4 on (t3.a = t4.a and t1.a = t4.a and t2.a = t4.a); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a) right join t4 on (t3.a = t4.a and t1.a = t4.a and t2.a = t4.a); +select * from t1 left join t2 on (t1.a = t2.a) left join t3 on (t2.a = t3.a) left join t4 on (t4.a = t3.a) left join t5 on (t5.a = t2.a) left join t6 on (t5.a = t6.a and t6.a = t4.a) left join t7 on (t7.a = t3.a) left join t8 on (t8.a = t7.a and t8.a = t2.a); + +# 3 cycles. +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) left join t5 on (t4.a = t5.a and t3.b = t5.b); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) right join t5 on (t4.a = t5.a and t3.b = t5.b); +select * from t1 left join t2 on (t1.a = t2.a) left join t3 on (t2.a = t3.a) left join t4 on (t4.a = t3.a) left join t5 on (t5.a = t2.a) left join t6 on (t5.a = t6.a and t6.a = t4.a) left join t7 on (t7.a = t3.a) left join t8 on (t8.a = t7.a and t8.a = t2.a) left join t9 on (t7.a = t9.a and t4.a = t9.a); + +# 4 cycles. +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) right join t5 on (t4.a = t5.a and t3.b = t5.b) right join t6 on (t5.a = t6.a and t4.a = t6.a); +select * from t1 inner join t2 on (t1.a = t2.a) right join t3 on (t1.a = t3.a and t2.b = t3.b) left join t4 on (t3.a = t4.a and t2.a = t4.a) right join t5 on (t4.a = t5.a and t3.b = t5.b) left join t6 on (t5.a = t6.a and t4.a = t6.a); + +drop table t1; +drop table t2; +drop table t3; +drop table t4; +drop table t5; +drop table t6; +drop table t7; +drop table t8; +drop table t9; + +DROP DATABASE mcol4699; diff --git a/utils/loggingcpp/ErrorMessage.txt b/utils/loggingcpp/ErrorMessage.txt index d918e2ff7..8de5a2272 100755 --- a/utils/loggingcpp/ErrorMessage.txt +++ b/utils/loggingcpp/ErrorMessage.txt @@ -22,7 +22,6 @@ 1000 ERR_MISS_JOIN %1% not joined. 1001 ERR_NON_SUPPORTED_FUNCTION Function '%1%' isn't supported. 1002 ERR_INCOMPATIBLE_JOIN %1% incompatible column type specified for join condition. -1003 ERR_CIRCULAR_JOIN Circular joins are not supported. 1004 ERR_MIX_JOIN Mixed %1% JOIN is not supported. 1005 ERR_UPDATE_SUB update with subselect in select clause is currently not supported in Columnstore. 1006 ERR_DATATYPE_NOT_SUPPORT Function called with unsupported datatype.