You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-08 14:22:09 +03:00
feat(joblist): better dot graphs that represents joblist steps execution tree.
This commit is contained in:
@@ -23,7 +23,12 @@
|
||||
#include <iostream>
|
||||
using namespace std;
|
||||
|
||||
#include <cstddef>
|
||||
#include <iterator>
|
||||
#include <sstream>
|
||||
|
||||
#include "joblist.h"
|
||||
#include "jobstep.h"
|
||||
#include "primitivestep.h"
|
||||
#include "subquerystep.h"
|
||||
#include "windowfunctionstep.h"
|
||||
@@ -44,302 +49,423 @@ using namespace joblist;
|
||||
|
||||
namespace jlf_graphics
|
||||
{
|
||||
ostream& writeDotCmds(ostream& dotFile, const JobStepVector& query, const JobStepVector& project)
|
||||
std::string generateDotFileName(const std::string& prefix)
|
||||
{
|
||||
// Graphic view draw
|
||||
dotFile << "digraph G {" << endl;
|
||||
JobStepVector::iterator qsi;
|
||||
JobStepVector::iterator psi;
|
||||
int ctn = 0;
|
||||
ostringstream oss;
|
||||
struct timeval tvbuf;
|
||||
gettimeofday(&tvbuf, 0);
|
||||
struct tm tmbuf;
|
||||
localtime_r(reinterpret_cast<time_t*>(&tvbuf.tv_sec), &tmbuf);
|
||||
oss << prefix << setfill('0') << setw(4) << (tmbuf.tm_year + 1900) << setw(2) << (tmbuf.tm_mon + 1)
|
||||
<< setw(2) << (tmbuf.tm_mday) << setw(2) << (tmbuf.tm_hour) << setw(2) << (tmbuf.tm_min) << setw(2)
|
||||
<< (tmbuf.tm_sec) << setw(6) << (tvbuf.tv_usec) << ".dot";
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
// merge in the subquery steps
|
||||
JobStepVector querySteps = query;
|
||||
JobStepVector projectSteps = project;
|
||||
JobStepVector GraphGeneratorInterface::extractSubquerySteps(const SJSTEP& sqs)
|
||||
{
|
||||
SubQueryStep* subquery = NULL;
|
||||
qsi = querySteps.begin();
|
||||
JobStepVector res;
|
||||
auto* subQueryStep = dynamic_cast<SubQueryStep*>(sqs.get());
|
||||
|
||||
while (qsi != querySteps.end())
|
||||
if (subQueryStep)
|
||||
{
|
||||
if ((subquery = dynamic_cast<SubQueryStep*>(qsi->get())) != NULL)
|
||||
JobStepVector subQuerySteps;
|
||||
auto& stepsBeforeRecursion = subQueryStep->subJoblist()->querySteps();
|
||||
for (auto& step : stepsBeforeRecursion)
|
||||
{
|
||||
querySteps.erase(qsi);
|
||||
JobStepVector subSteps = subquery->subJoblist()->querySteps();
|
||||
querySteps.insert(querySteps.end(), subSteps.begin(), subSteps.end());
|
||||
qsi = querySteps.begin();
|
||||
auto steps = extractJobSteps(step);
|
||||
subQuerySteps.insert(subQuerySteps.end(), steps.begin(), steps.end());
|
||||
}
|
||||
res.insert(res.end(), subQuerySteps.begin(), subQuerySteps.end());
|
||||
}
|
||||
res.push_back(sqs);
|
||||
return res;
|
||||
}
|
||||
|
||||
// This f() is recursive to handle nested subqueries
|
||||
JobStepVector GraphGeneratorInterface::extractJobSteps(const SJSTEP& umbrella)
|
||||
{
|
||||
JobStepVector res;
|
||||
if (typeid(*umbrella) == typeid(SubAdapterStep))
|
||||
{
|
||||
auto* subAdapterStep = dynamic_cast<SubAdapterStep*>(umbrella.get());
|
||||
assert(subAdapterStep);
|
||||
auto subQuerySteps = extractSubquerySteps(subAdapterStep->subStep());
|
||||
res.insert(res.end(), subQuerySteps.begin(), subQuerySteps.end());
|
||||
res.push_back(umbrella);
|
||||
}
|
||||
|
||||
else if (typeid(*umbrella) == typeid(SubQueryStep))
|
||||
{
|
||||
auto subQuerySteps = extractSubquerySteps(umbrella);
|
||||
res.insert(res.end(), subQuerySteps.begin(), subQuerySteps.end());
|
||||
}
|
||||
else
|
||||
{
|
||||
qsi++;
|
||||
}
|
||||
res.push_back(umbrella);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
for (qsi = querySteps.begin(); qsi != querySteps.end(); ctn++, qsi++)
|
||||
std::string getLadderRepr(const JobStepVector& steps, const std::vector<size_t>& tabsToPretify)
|
||||
{
|
||||
// if (dynamic_cast<OrDelimiter*>(qsi->get()) != NULL)
|
||||
// continue;
|
||||
std::ostringstream oss;
|
||||
assert(tabsToPretify.size() == steps.size());
|
||||
// Tabs are in the reverse order of the steps
|
||||
auto tabsIt = tabsToPretify.begin();
|
||||
// Reverse the order of the steps to draw the graph from top to bottom
|
||||
for (auto s = steps.rbegin(); s != steps.rend(); ++s, ++tabsIt)
|
||||
{
|
||||
oss << std::string(*tabsIt, '\t');
|
||||
oss << (*s)->extendedInfo() << std::endl;
|
||||
}
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
uint16_t stepidIn = qsi->get()->stepId();
|
||||
dotFile << stepidIn << " [label=\"st_" << stepidIn << " ";
|
||||
std::string GraphGeneratorInterface::getGraphNode(const SJSTEP& stepPtr)
|
||||
{
|
||||
auto& step = *stepPtr;
|
||||
uint16_t stepidIn = step.stepId();
|
||||
std::ostringstream oss;
|
||||
oss << stepidIn << " [label=\"st_" << stepidIn << " ";
|
||||
|
||||
if (typeid(*(qsi->get())) == typeid(pColStep))
|
||||
if (typeid(step) == typeid(pColStep))
|
||||
{
|
||||
dotFile << "(" << qsi->get()->tableOid() << "/" << qsi->get()->oid() << ")"
|
||||
<< "\"";
|
||||
dotFile << " shape=ellipse";
|
||||
oss << "(" << step.tableOid() << "/" << step.oid() << ")" << "\"";
|
||||
oss << " shape=ellipse";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(pColScanStep))
|
||||
else if (typeid(step) == typeid(pColScanStep))
|
||||
{
|
||||
dotFile << "(" << qsi->get()->tableOid() << "/" << qsi->get()->oid() << ")"
|
||||
<< "\"";
|
||||
dotFile << " shape=box";
|
||||
oss << "(" << step.tableOid() << "/" << step.oid() << ")" << "\"";
|
||||
oss << " shape=box";
|
||||
}
|
||||
// else if (typeid(*(qsi->get())) == typeid(HashJoinStep) ||
|
||||
// typeid(*(qsi->get())) == typeid(StringHashJoinStep))
|
||||
// {
|
||||
// dotFile << "\"";
|
||||
// dotFile << " shape=diamond";
|
||||
// }
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleHashJoinStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=diamond peripheries=2";
|
||||
}
|
||||
// else if (typeid(*(qsi->get())) == typeid(UnionStep) ||
|
||||
// typeid(*(qsi->get())) == typeid(TupleUnion) )
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleUnion))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=triangle";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(pDictionaryStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=trapezium";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(FilterStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=house orientation=180";
|
||||
}
|
||||
// else if (typeid(*(qsi->get())) == typeid(ReduceStep))
|
||||
// {
|
||||
// dotFile << "\"";
|
||||
// dotFile << " shape=triangle orientation=180";
|
||||
// }
|
||||
// else if (typeid(*(qsi->get())) == typeid(BatchPrimitiveStep) || typeid(*(qsi->get())) ==
|
||||
//typeid(TupleBPS))
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
else if (typeid(step) == typeid(TupleBPS))
|
||||
{
|
||||
bool isTuple = false;
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(qsi->get());
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(stepPtr.get());
|
||||
|
||||
if (dynamic_cast<TupleBPS*>(bps) != 0)
|
||||
isTuple = true;
|
||||
|
||||
dotFile << "(" << bps->tableOid() << "/" << bps->oid();
|
||||
oss << "(" << bps->tableOid() << "/" << bps->oid() << "/" << bps->alias();
|
||||
OIDVector projectOids = bps->getProjectOids();
|
||||
|
||||
if (projectOids.size() > 0)
|
||||
{
|
||||
dotFile << "\\l";
|
||||
dotFile << "PC: ";
|
||||
oss << "\\l";
|
||||
oss << "PC: ";
|
||||
}
|
||||
|
||||
for (unsigned int i = 0; i < projectOids.size(); i++)
|
||||
{
|
||||
dotFile << projectOids[i] << " ";
|
||||
oss << projectOids[i] << " ";
|
||||
|
||||
if ((i + 1) % 3 == 0)
|
||||
dotFile << "\\l";
|
||||
oss << "\\l";
|
||||
}
|
||||
|
||||
dotFile << ")\"";
|
||||
dotFile << " shape=box style=bold";
|
||||
oss << ")\"";
|
||||
oss << " shape=box style=bold";
|
||||
|
||||
if (isTuple)
|
||||
dotFile << " peripheries=2";
|
||||
oss << " peripheries=2";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(CrossEngineStep))
|
||||
else if (typeid(step) == typeid(CrossEngineStep))
|
||||
{
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(qsi->get());
|
||||
dotFile << "(" << bps->alias() << ")\"";
|
||||
dotFile << " shape=box style=bold";
|
||||
dotFile << " peripheries=2";
|
||||
CrossEngineStep* cej = dynamic_cast<CrossEngineStep*>(stepPtr.get());
|
||||
oss << "(" << cej->tableName() << "/" << cej->tableAlias() << ")\"";
|
||||
oss << " shape=cylinder style=bold";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleAggregateStep))
|
||||
else if (typeid(step) == typeid(TupleHashJoinStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=triangle orientation=180";
|
||||
oss << "\"";
|
||||
oss << " shape=diamond peripheries=2";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleAnnexStep))
|
||||
else if (typeid(step) == typeid(TupleUnion))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=star";
|
||||
oss << "\"";
|
||||
oss << " shape=triangle";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(WindowFunctionStep))
|
||||
else if (typeid(step) == typeid(pDictionaryStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=triangle orientation=180";
|
||||
dotFile << " peripheries=2";
|
||||
oss << "\"";
|
||||
oss << " shape=trapezium";
|
||||
}
|
||||
else if (typeid(step) == typeid(FilterStep))
|
||||
{
|
||||
oss << "\"";
|
||||
oss << " shape=invhouse";
|
||||
}
|
||||
|
||||
else if (typeid(step) == typeid(TupleAggregateStep))
|
||||
{
|
||||
oss << "\"";
|
||||
oss << " shape=invtriangle";
|
||||
}
|
||||
else if (typeid(step) == typeid(TupleAnnexStep))
|
||||
{
|
||||
oss << "\"";
|
||||
oss << " shape=star";
|
||||
}
|
||||
else if (typeid(step) == typeid(WindowFunctionStep))
|
||||
{
|
||||
oss << "\"";
|
||||
oss << " shape=invtriangle";
|
||||
oss << " peripheries=2";
|
||||
}
|
||||
else if (typeid(step) == typeid(SubAdapterStep))
|
||||
{
|
||||
oss << "\"";
|
||||
oss << " shape=polygon";
|
||||
oss << " peripheries=2";
|
||||
}
|
||||
else if (typeid(step) == typeid(SubQueryStep))
|
||||
{
|
||||
oss << "\"";
|
||||
oss << " shape=polygon";
|
||||
}
|
||||
// else if (typeid(*(qsi->get())) == typeid(AggregateFilterStep))
|
||||
// {
|
||||
// dotFile << "\"";
|
||||
// dotFile << " shape=hexagon peripheries=2 style=bold";
|
||||
// }
|
||||
// else if (typeid(*(qsi->get())) == typeid(BucketReuseStep))
|
||||
// {
|
||||
// dotFile << "(" << qsi->get()->tableOid() << "/" << qsi->get()->oid() << ")" << "\"";
|
||||
// dotFile << " shape=box style=dashed";
|
||||
// }
|
||||
else
|
||||
dotFile << "\"";
|
||||
|
||||
dotFile << "]" << endl;
|
||||
|
||||
for (unsigned int i = 0; i < qsi->get()->outputAssociation().outSize(); i++)
|
||||
{
|
||||
RowGroupDL* dlout = qsi->get()->outputAssociation().outAt(i)->rowGroupDL();
|
||||
ptrdiff_t dloutptr = (ptrdiff_t)dlout;
|
||||
oss << "\"";
|
||||
}
|
||||
|
||||
for (unsigned int k = 0; k < querySteps.size(); k++)
|
||||
oss << "]" << endl;
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
std::pair<size_t, std::string> GraphGeneratorInterface::getTabsAndEdges(
|
||||
const JobStepVector& querySteps, const JobStepVector& projectSteps, const SJSTEP& stepPtr,
|
||||
const std::vector<size_t>& tabsToPretify)
|
||||
{
|
||||
uint16_t stepidOut = querySteps[k].get()->stepId();
|
||||
JobStepAssociation queryInputSA = querySteps[k].get()->inputAssociation();
|
||||
auto& step = *stepPtr;
|
||||
uint16_t stepidIn = step.stepId();
|
||||
std::ostringstream oss;
|
||||
size_t tab = 0;
|
||||
|
||||
for (unsigned int i = 0; i < step.outputAssociation().outSize(); i++)
|
||||
{
|
||||
ptrdiff_t dloutptr = 0;
|
||||
auto* dlout = step.outputAssociation().outAt(i)->rowGroupDL();
|
||||
uint32_t numConsumers = step.outputAssociation().outAt(i)->getNumConsumers();
|
||||
|
||||
if (dlout)
|
||||
{
|
||||
dloutptr = (ptrdiff_t)dlout;
|
||||
}
|
||||
|
||||
for (auto it = querySteps.rbegin(); it != querySteps.rend(); ++it)
|
||||
{
|
||||
auto& otherStep = *it;
|
||||
// Reverse order idx
|
||||
auto otherIdx = std::distance(querySteps.rbegin(), it);
|
||||
uint16_t stepidOut = otherStep.get()->stepId();
|
||||
JobStepAssociation queryInputSA = otherStep.get()->inputAssociation();
|
||||
|
||||
for (unsigned int j = 0; j < queryInputSA.outSize(); j++)
|
||||
{
|
||||
RowGroupDL* dlin = queryInputSA.outAt(j)->rowGroupDL();
|
||||
ptrdiff_t dlinptr = (ptrdiff_t)dlin;;
|
||||
ptrdiff_t dlinptr = 0;
|
||||
auto* dlin = queryInputSA.outAt(j)->rowGroupDL();
|
||||
|
||||
if (dlin)
|
||||
{
|
||||
dlinptr = (ptrdiff_t)dlin;
|
||||
}
|
||||
|
||||
if ((ptrdiff_t)dloutptr == (ptrdiff_t)dlinptr)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << stepidOut;
|
||||
oss << stepidIn << " -> " << stepidOut;
|
||||
|
||||
if (dlin)
|
||||
{
|
||||
oss << " [label=\"[" << numConsumers << "]\"]" << endl;
|
||||
tab = tabsToPretify[otherIdx] + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (psi = projectSteps.begin(); psi < projectSteps.end(); psi++)
|
||||
for (auto& otherProjectStep : projectSteps)
|
||||
{
|
||||
uint16_t stepidOut = psi->get()->stepId();
|
||||
JobStepAssociation projectInputSA = psi->get()->inputAssociation();
|
||||
uint16_t stepidOut = otherProjectStep->stepId();
|
||||
JobStepAssociation projectInputSA = otherProjectStep->inputAssociation();
|
||||
|
||||
for (unsigned int j = 0; j < projectInputSA.outSize(); j++)
|
||||
{
|
||||
RowGroupDL* dlin = projectInputSA.outAt(j)->rowGroupDL();
|
||||
ptrdiff_t dlinptr = (ptrdiff_t)dlin;;
|
||||
ptrdiff_t dlinptr = 0;
|
||||
auto* dlin = projectInputSA.outAt(j)->rowGroupDL();
|
||||
|
||||
if (dlin)
|
||||
dlinptr = (ptrdiff_t)dlin;
|
||||
|
||||
if (dloutptr == dlinptr)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << stepidOut;
|
||||
oss << stepidIn << " -> " << stepidOut;
|
||||
|
||||
if (dlin)
|
||||
{
|
||||
oss << " [label=\"[" << numConsumers << "]\"]" << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return {tab, oss.str()};
|
||||
}
|
||||
|
||||
for (psi = projectSteps.begin(), ctn = 0; psi != projectSteps.end(); ctn++, psi++)
|
||||
std::string GraphGeneratorInterface::getGraphProjectionNode(SJSTEP& step)
|
||||
{
|
||||
uint16_t stepidIn = psi->get()->stepId();
|
||||
dotFile << stepidIn << " [label=\"st_" << stepidIn << " ";
|
||||
std::ostringstream oss;
|
||||
uint16_t stepidIn = step->stepId();
|
||||
oss << stepidIn << " [label=\"st_" << stepidIn << " ";
|
||||
|
||||
if (typeid(*(psi->get())) == typeid(pColStep))
|
||||
if (typeid(*(step)) == typeid(pColStep))
|
||||
{
|
||||
dotFile << "(" << psi->get()->tableOid() << "/" << psi->get()->oid() << ")"
|
||||
<< "\"";
|
||||
dotFile << " shape=ellipse";
|
||||
oss << "(" << step->tableOid() << "/" << step->oid() << ")" << "\"";
|
||||
oss << " shape=ellipse";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(pColScanStep))
|
||||
else if (typeid(*(step)) == typeid(pColScanStep))
|
||||
{
|
||||
dotFile << "(" << psi->get()->tableOid() << "/" << psi->get()->oid() << ")"
|
||||
<< "\"";
|
||||
dotFile << " shape=box";
|
||||
oss << "(" << step->tableOid() << "/" << step->oid() << ")" << "\"";
|
||||
oss << " shape=box";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(pDictionaryStep))
|
||||
else if (typeid(*(step)) == typeid(pDictionaryStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=trapezium";
|
||||
oss << "\"";
|
||||
oss << " shape=trapezium";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(PassThruStep))
|
||||
else if (typeid(*(step)) == typeid(PassThruStep))
|
||||
{
|
||||
dotFile << "(" << psi->get()->tableOid() << "/" << psi->get()->oid() << ")"
|
||||
<< "\"";
|
||||
dotFile << " shape=octagon";
|
||||
oss << "(" << step->tableOid() << "/" << step->oid() << ")" << "\"";
|
||||
oss << " shape=octagon";
|
||||
}
|
||||
// else if (typeid(*(psi->get())) == typeid(BatchPrimitiveStep) || typeid(*(psi->get())) ==
|
||||
//typeid(TupleBPS))
|
||||
else if (typeid(*(psi->get())) == typeid(TupleBPS))
|
||||
else if (typeid(*(step)) == typeid(TupleBPS))
|
||||
{
|
||||
bool isTuple = false;
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(psi->get());
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(step.get());
|
||||
|
||||
if (dynamic_cast<TupleBPS*>(bps) != 0)
|
||||
isTuple = true;
|
||||
|
||||
dotFile << "(" << bps->tableOid() << ":\\l";
|
||||
oss << "(" << bps->tableOid() << ":\\l";
|
||||
OIDVector projectOids = bps->getProjectOids();
|
||||
|
||||
for (unsigned int i = 0; i < projectOids.size(); i++)
|
||||
{
|
||||
dotFile << projectOids[i] << " ";
|
||||
oss << projectOids[i] << " ";
|
||||
|
||||
if ((i + 1) % 3 == 0)
|
||||
dotFile << "\\l";
|
||||
oss << "\\l";
|
||||
}
|
||||
|
||||
dotFile << ")\"";
|
||||
dotFile << " shape=box style=bold";
|
||||
oss << ")\"";
|
||||
oss << " shape=box style=bold";
|
||||
|
||||
if (isTuple)
|
||||
dotFile << " peripheries=2";
|
||||
oss << " peripheries=2";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(CrossEngineStep))
|
||||
else if (typeid(*(step)) == typeid(CrossEngineStep))
|
||||
{
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(qsi->get());
|
||||
dotFile << "(" << bps->alias() << ")\"";
|
||||
dotFile << " shape=box style=bold";
|
||||
dotFile << " peripheries=2";
|
||||
CrossEngineStep* cej = dynamic_cast<CrossEngineStep*>(step.get());
|
||||
oss << "(" << cej->tableName() << "/" << cej->tableAlias() << ")\"";
|
||||
oss << " shape=cylinder style=bold";
|
||||
}
|
||||
else
|
||||
dotFile << "\"";
|
||||
oss << "\"";
|
||||
|
||||
dotFile << "]" << endl;
|
||||
oss << "]" << endl;
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
for (unsigned int i = 0; i < psi->get()->outputAssociation().outSize(); i++)
|
||||
std::string GraphGeneratorInterface::getProjectionEdges(JobStepVector& steps, SJSTEP& step,
|
||||
const std::size_t ctn)
|
||||
{
|
||||
RowGroupDL* dlout = psi->get()->outputAssociation().outAt(i)->rowGroupDL();
|
||||
ptrdiff_t dloutptr = (ptrdiff_t)dlout;
|
||||
uint16_t stepidIn = step->stepId();
|
||||
std::ostringstream oss;
|
||||
|
||||
for (unsigned int k = ctn + 1; k < projectSteps.size(); k++)
|
||||
for (unsigned int i = 0; i < step->outputAssociation().outSize(); i++)
|
||||
{
|
||||
uint16_t stepidOut = projectSteps[k].get()->stepId();
|
||||
JobStepAssociation projectInputSA = projectSteps[k].get()->inputAssociation();
|
||||
ptrdiff_t dloutptr = 0;
|
||||
auto* dlout = step->outputAssociation().outAt(i)->rowGroupDL();
|
||||
uint32_t numConsumers = step->outputAssociation().outAt(i)->getNumConsumers();
|
||||
|
||||
if (dlout)
|
||||
{
|
||||
dloutptr = (ptrdiff_t)dlout;
|
||||
}
|
||||
|
||||
for (auto k = ctn + 1; k < steps.size(); k++)
|
||||
{
|
||||
uint16_t stepidOut = steps[k].get()->stepId();
|
||||
JobStepAssociation projectInputSA = steps[k].get()->inputAssociation();
|
||||
|
||||
for (unsigned int j = 0; j < projectInputSA.outSize(); j++)
|
||||
{
|
||||
RowGroupDL* dlin = projectInputSA.outAt(j)->rowGroupDL();
|
||||
ptrdiff_t dlinptr = (ptrdiff_t)dlin;
|
||||
ptrdiff_t dlinptr = 0;
|
||||
auto* dlin = projectInputSA.outAt(j)->rowGroupDL();
|
||||
|
||||
if (dlin)
|
||||
dlinptr = (ptrdiff_t)dlin;
|
||||
|
||||
if ((ptrdiff_t)dloutptr == (ptrdiff_t)dlinptr)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << stepidOut;
|
||||
oss << stepidIn << " -> " << stepidOut;
|
||||
|
||||
if (dlin)
|
||||
{
|
||||
oss << " [label=\"[" << numConsumers << "]\"]" << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
dotFile << "}" << endl;
|
||||
std::string GraphGeneratorInterface::writeDotCmds()
|
||||
{
|
||||
// Graphic view draw
|
||||
std::ostringstream oss;
|
||||
oss << "digraph G {" << endl;
|
||||
|
||||
return dotFile;
|
||||
// merge in the subquery steps
|
||||
JobStepVector querySteps;
|
||||
for (auto& step : query)
|
||||
{
|
||||
auto steps = extractJobSteps(step);
|
||||
querySteps.insert(querySteps.end(), steps.begin(), steps.end());
|
||||
}
|
||||
|
||||
JobStepVector projectSteps = project;
|
||||
std::vector<size_t> tabsToPretify;
|
||||
// Reverse the order of the steps to draw the graph from top to bottom
|
||||
for (auto it = querySteps.rbegin(); it != querySteps.rend(); ++it)
|
||||
{
|
||||
auto& step = *it;
|
||||
oss << getGraphNode(step);
|
||||
auto [tab, graphEdges] = getTabsAndEdges(querySteps, projectSteps, step, tabsToPretify);
|
||||
tabsToPretify.push_back(tab);
|
||||
oss << graphEdges;
|
||||
}
|
||||
|
||||
for (auto psi = projectSteps.begin(); psi != projectSteps.end(); ++psi)
|
||||
{
|
||||
auto& step = *psi;
|
||||
oss << getGraphProjectionNode(step);
|
||||
auto idx = std::distance(projectSteps.begin(), psi);
|
||||
oss << getProjectionEdges(projectSteps, step, idx);
|
||||
}
|
||||
|
||||
oss << "}" << endl;
|
||||
|
||||
auto ladderRepr = getLadderRepr(querySteps, tabsToPretify);
|
||||
cout << endl;
|
||||
cout << ladderRepr;
|
||||
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
} // end namespace jlf_graphics
|
||||
|
||||
|
||||
#ifdef __clang__
|
||||
#pragma clang diagnostic pop
|
||||
#endif
|
||||
|
@@ -21,8 +21,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "joblist.h"
|
||||
|
||||
namespace jlf_graphics
|
||||
@@ -30,7 +28,53 @@ namespace jlf_graphics
|
||||
/** Format a stream of dot commands
|
||||
* Format a stream of dot commands
|
||||
*/
|
||||
std::ostream& writeDotCmds(std::ostream& dotFile, const joblist::JobStepVector& querySteps,
|
||||
const joblist::JobStepVector& projectSteps);
|
||||
|
||||
std::string getLadderRepr(const joblist::JobStepVector& steps, const std::vector<size_t>& tabsToPretify);
|
||||
std::string generateDotFileName(const std::string& prefix);
|
||||
|
||||
class GraphGeneratorInterface
|
||||
{
|
||||
public:
|
||||
GraphGeneratorInterface(const joblist::JobStepVector& query, const joblist::JobStepVector& project)
|
||||
: query(query), project(project){};
|
||||
|
||||
virtual ~GraphGeneratorInterface(){};
|
||||
|
||||
virtual std::string writeDotCmds();
|
||||
|
||||
private:
|
||||
virtual joblist::JobStepVector extractSubquerySteps(const joblist::SJSTEP& sqs);
|
||||
virtual joblist::JobStepVector extractJobSteps(const joblist::SJSTEP& umbrella);
|
||||
|
||||
virtual std::string getGraphNode(const joblist::SJSTEP& stepPtr);
|
||||
virtual std::pair<size_t, std::string> getTabsAndEdges(const joblist::JobStepVector& querySteps,
|
||||
const joblist::JobStepVector& projectSteps,
|
||||
const joblist::SJSTEP& stepPtr,
|
||||
const std::vector<size_t>& tabsToPretify);
|
||||
virtual std::string getGraphProjectionNode(joblist::SJSTEP& step);
|
||||
virtual std::string getProjectionEdges(joblist::JobStepVector& steps, joblist::SJSTEP& step,
|
||||
const std::size_t ctn);
|
||||
|
||||
const joblist::JobStepVector& query;
|
||||
const joblist::JobStepVector& project;
|
||||
};
|
||||
|
||||
class GraphGeneratorNoStats : public GraphGeneratorInterface
|
||||
{
|
||||
public:
|
||||
GraphGeneratorNoStats(const joblist::JobStepVector& query, const joblist::JobStepVector& project)
|
||||
: GraphGeneratorInterface(query, project){};
|
||||
~GraphGeneratorNoStats(){};
|
||||
};
|
||||
|
||||
class GraphGeneratorWStats : public GraphGeneratorInterface
|
||||
{
|
||||
public:
|
||||
GraphGeneratorWStats(const joblist::JobStepVector& query, const joblist::JobStepVector& project)
|
||||
: GraphGeneratorInterface(query, project){};
|
||||
~GraphGeneratorWStats(){};
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
} // namespace jlf_graphics
|
||||
|
@@ -19,6 +19,7 @@
|
||||
// $Id: joblist.cpp 9655 2013-06-25 23:08:13Z xlou $
|
||||
|
||||
// Cross engine needs to be at the top due to MySQL includes
|
||||
#include <algorithm>
|
||||
#define PREFER_MY_CONFIG_H
|
||||
#include "crossenginestep.h"
|
||||
#include "errorcodes.h"
|
||||
@@ -33,6 +34,7 @@ using namespace std;
|
||||
using namespace execplan;
|
||||
|
||||
#include "errorids.h"
|
||||
#include "jlf_graphics.h"
|
||||
#include "jobstep.h"
|
||||
#include "primitivestep.h"
|
||||
#include "subquerystep.h"
|
||||
@@ -570,530 +572,15 @@ void JobList::querySummary(bool extendedStats)
|
||||
void JobList::graph(uint32_t sessionID)
|
||||
{
|
||||
// Graphic view draw
|
||||
ostringstream oss;
|
||||
struct timeval tvbuf;
|
||||
gettimeofday(&tvbuf, 0);
|
||||
struct tm tmbuf;
|
||||
localtime_r(reinterpret_cast<time_t*>(&tvbuf.tv_sec), &tmbuf);
|
||||
oss << "jobstep_results." << setfill('0') << setw(4) << (tmbuf.tm_year + 1900) << setw(2)
|
||||
<< (tmbuf.tm_mon + 1) << setw(2) << (tmbuf.tm_mday) << setw(2) << (tmbuf.tm_hour) << setw(2)
|
||||
<< (tmbuf.tm_min) << setw(2) << (tmbuf.tm_sec) << setw(6) << (tvbuf.tv_usec) << ".dot";
|
||||
string jsrname(oss.str());
|
||||
// it's too late to set this here. ExeMgr has already returned ei to dm...
|
||||
// fExtendedInfo += "Graphs are in " + jsrname;
|
||||
auto jsrname = jlf_graphics::generateDotFileName("jobstep_results.");
|
||||
std::ofstream dotFile(jsrname.c_str(), std::ios::out);
|
||||
dotFile << "digraph G {" << std::endl;
|
||||
JobStepVector::iterator qsi;
|
||||
JobStepVector::iterator psi;
|
||||
DeliveredTableMap::iterator dsi;
|
||||
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
CalpontSystemCatalog::TableColName tcn;
|
||||
uint64_t outSize = 0;
|
||||
uint64_t msgs = 0;
|
||||
uint64_t pio = 0;
|
||||
int ctn = 0;
|
||||
bool diskIo = false;
|
||||
uint64_t saveTime = 0;
|
||||
uint64_t loadTime = 0;
|
||||
|
||||
// merge in the subquery steps
|
||||
JobStepVector querySteps = fQuery;
|
||||
{
|
||||
SubQueryStep* subquery = NULL;
|
||||
qsi = querySteps.begin();
|
||||
|
||||
while (qsi != querySteps.end())
|
||||
{
|
||||
if ((subquery = dynamic_cast<SubQueryStep*>(qsi->get())) != NULL)
|
||||
{
|
||||
querySteps.erase(qsi);
|
||||
JobStepVector subSteps = subquery->subJoblist()->querySteps();
|
||||
querySteps.insert(querySteps.end(), subSteps.begin(), subSteps.end());
|
||||
qsi = querySteps.begin();
|
||||
}
|
||||
else
|
||||
{
|
||||
qsi++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (qsi = querySteps.begin(); qsi != querySteps.end(); ctn++, qsi++)
|
||||
{
|
||||
// HashJoinStep* hjs = 0;
|
||||
|
||||
// if (dynamic_cast<OrDelimiter*>(qsi->get()) != NULL)
|
||||
// continue;
|
||||
|
||||
// @bug 1042. clear column name first at each loop
|
||||
tcn.column = "";
|
||||
|
||||
uint16_t stepidIn = qsi->get()->stepId();
|
||||
dotFile << stepidIn << " [label=\"st_" << stepidIn << " ";
|
||||
|
||||
// @Bug 1033. colName was being called for dictionary steps that don't have column names.
|
||||
// Added if condition below.
|
||||
if (typeid(*(qsi->get())) == typeid(pColScanStep) || typeid(*(qsi->get())) == typeid(pColStep))
|
||||
tcn = csc->colName(qsi->get()->oid());
|
||||
|
||||
dotFile << "(";
|
||||
|
||||
if (!tcn.column.empty())
|
||||
dotFile << tcn.column << "/";
|
||||
|
||||
if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(qsi->get());
|
||||
OIDVector projectOids = bps->getProjectOids();
|
||||
|
||||
if (projectOids.size() > 0)
|
||||
{
|
||||
dotFile << "\\l";
|
||||
dotFile << "PC:";
|
||||
dotFile << "\\l";
|
||||
|
||||
for (unsigned int i = 0; i < projectOids.size(); i++)
|
||||
{
|
||||
tcn = csc->colName(projectOids[i]);
|
||||
|
||||
if (!tcn.column.empty())
|
||||
dotFile << tcn.column << " ";
|
||||
|
||||
if ((i + 1) % 3 == 0)
|
||||
dotFile << "\\l";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
tcn = csc->colName(qsi->get()->oid());
|
||||
dotFile << tcn.column << "/";
|
||||
}
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(CrossEngineStep))
|
||||
{
|
||||
tcn.schema = qsi->get()->schema();
|
||||
tcn.table = qsi->get()->alias();
|
||||
}
|
||||
|
||||
dotFile << JSTimeStamp::tsdiffstr(qsi->get()->dlTimes.EndOfInputTime(),
|
||||
qsi->get()->dlTimes.FirstReadTime())
|
||||
<< "s";
|
||||
|
||||
dotFile << ")";
|
||||
|
||||
// oracle predict card
|
||||
dotFile << "\\l#: " << (*qsi)->cardinality();
|
||||
|
||||
if (typeid(*(qsi->get())) == typeid(pColStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=ellipse";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(pColScanStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(qsi->get());
|
||||
|
||||
// if BPS not run, a BucketReuseStep was substituted, so draw dashed
|
||||
if (bps->wasStepRun())
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box style=bold";
|
||||
|
||||
if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
dotFile << " peripheries=2";
|
||||
}
|
||||
else
|
||||
dotFile << "\""
|
||||
<< " shape=box style=dashed";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(CrossEngineStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box style=dashed";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleAggregateStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=triangle orientation=180";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleAnnexStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=star";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(WindowFunctionStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=triangle orientation=180 peripheries=2";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleHashJoinStep))
|
||||
{
|
||||
dotFile << "\"";
|
||||
dotFile << " shape=diamond style=dashed peripheries=2";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleUnion))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=triangle";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(pDictionaryStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=trapezium";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(FilterStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=house orientation=180";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box style=bold";
|
||||
dotFile << " peripheries=2";
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(CrossEngineStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box style=bold";
|
||||
dotFile << " peripheries=2";
|
||||
}
|
||||
else
|
||||
dotFile << "\"";
|
||||
|
||||
dotFile << "]" << endl;
|
||||
|
||||
// msgsRecived, physicalIO, cacheIO
|
||||
msgs = qsi->get()->msgsRcvdCount();
|
||||
pio = qsi->get()->phyIOCount();
|
||||
|
||||
for (unsigned int i = 0; i < qsi->get()->outputAssociation().outSize(); i++)
|
||||
{
|
||||
ptrdiff_t dloutptr = 0;
|
||||
RowGroupDL* dlout;
|
||||
// TupleDataList* tdl;
|
||||
|
||||
if ((dlout = qsi->get()->outputAssociation().outAt(i)->rowGroupDL()))
|
||||
{
|
||||
dloutptr = (ptrdiff_t)dlout;
|
||||
outSize = dlout->totalSize();
|
||||
diskIo = dlout->totalDiskIoTime(saveTime, loadTime);
|
||||
}
|
||||
|
||||
// if HashJoinStep, determine if output fifo was cached to disk
|
||||
bool hjTempDiskFlag = false;
|
||||
|
||||
for (unsigned int k = 0; k < querySteps.size(); k++)
|
||||
{
|
||||
uint16_t stepidOut = querySteps[k].get()->stepId();
|
||||
JobStepAssociation queryInputSA = querySteps[k].get()->inputAssociation();
|
||||
|
||||
for (unsigned int j = 0; j < queryInputSA.outSize(); j++)
|
||||
{
|
||||
ptrdiff_t dlinptr = 0;
|
||||
RowGroupDL* dlin = queryInputSA.outAt(j)->rowGroupDL();
|
||||
|
||||
if (dlin)
|
||||
dlinptr = (ptrdiff_t)dlin;
|
||||
|
||||
if (dloutptr == dlinptr)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << stepidOut;
|
||||
|
||||
dotFile << " [label=\" r: " << outSize;
|
||||
|
||||
if (hjTempDiskFlag)
|
||||
{
|
||||
dotFile << "*";
|
||||
}
|
||||
|
||||
dotFile << "\\l";
|
||||
|
||||
if (msgs != 0)
|
||||
{
|
||||
dotFile << " m: " << msgs << "\\l";
|
||||
|
||||
if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
dotFile << " b: " << qsi->get()->blockTouched() << "\\l";
|
||||
}
|
||||
|
||||
dotFile << " p: " << pio << "\\l";
|
||||
}
|
||||
|
||||
if (diskIo == true)
|
||||
{
|
||||
dotFile << " wr: " << saveTime << "s\\l";
|
||||
dotFile << " rd: " << loadTime << "s\\l";
|
||||
}
|
||||
|
||||
dotFile << "\"]" << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (psi = fProject.begin(); psi < fProject.end(); psi++)
|
||||
{
|
||||
uint16_t stepidOut = psi->get()->stepId();
|
||||
JobStepAssociation projectInputSA = psi->get()->inputAssociation();
|
||||
|
||||
for (unsigned int j = 0; j < projectInputSA.outSize(); j++)
|
||||
{
|
||||
RowGroupDL* dlin = projectInputSA.outAt(j)->rowGroupDL();
|
||||
ptrdiff_t dlinptr = (ptrdiff_t)dlin;
|
||||
|
||||
if (dloutptr == dlinptr)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << stepidOut;
|
||||
|
||||
dotFile << " [label=\" r: " << outSize;
|
||||
|
||||
if (hjTempDiskFlag)
|
||||
{
|
||||
dotFile << "*";
|
||||
}
|
||||
|
||||
dotFile << "\\l";
|
||||
|
||||
if (msgs != 0)
|
||||
{
|
||||
dotFile << " m: " << msgs << "\\l";
|
||||
dotFile << " p: " << pio << "\\l";
|
||||
}
|
||||
|
||||
if (diskIo == true)
|
||||
{
|
||||
dotFile << " wr: " << saveTime << "s\\l";
|
||||
dotFile << " rd: " << loadTime << "s\\l";
|
||||
}
|
||||
|
||||
dotFile << "\"]" << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//@Bug 921
|
||||
if (typeid(*(qsi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
BatchPrimitive* bps = dynamic_cast<BatchPrimitive*>(qsi->get());
|
||||
CalpontSystemCatalog::OID tableOIDProject = bps->tableOid();
|
||||
|
||||
if (bps->getOutputType() == TABLE_BAND || bps->getOutputType() == ROW_GROUP)
|
||||
{
|
||||
outSize = bps->getRows();
|
||||
|
||||
for (dsi = fDeliveredTables.begin(); dsi != fDeliveredTables.end(); dsi++)
|
||||
{
|
||||
BatchPrimitive* bpsDelivery = dynamic_cast<BatchPrimitive*>((dsi->second).get());
|
||||
TupleHashJoinStep* thjDelivery = dynamic_cast<TupleHashJoinStep*>((dsi->second).get());
|
||||
|
||||
if (bpsDelivery)
|
||||
{
|
||||
CalpontSystemCatalog::OID tableOID = bpsDelivery->tableOid();
|
||||
dotFile << tableOID << " [label=" << bpsDelivery->alias() << " shape=plaintext]" << endl;
|
||||
JobStepAssociation deliveryInputSA = bpsDelivery->inputAssociation();
|
||||
|
||||
if (tableOIDProject == tableOID)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << tableOID;
|
||||
|
||||
dotFile << " [label=\" r: " << outSize << "\\l";
|
||||
dotFile << " m: " << bpsDelivery->msgsRcvdCount() << "\\l";
|
||||
dotFile << " b: " << bpsDelivery->blockTouched() << "\\l";
|
||||
dotFile << " p: " << bpsDelivery->phyIOCount() << "\\l";
|
||||
dotFile << "\"]" << endl;
|
||||
}
|
||||
}
|
||||
else if (thjDelivery)
|
||||
{
|
||||
CalpontSystemCatalog::OID tableOID = thjDelivery->tableOid();
|
||||
dotFile << tableOID << " [label="
|
||||
<< "vtable"
|
||||
<< " shape=plaintext]" << endl;
|
||||
JobStepAssociation deliveryInputSA = thjDelivery->inputAssociation();
|
||||
|
||||
if (tableOIDProject == tableOID)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << tableOID;
|
||||
|
||||
dotFile << " [label=\" r: " << outSize << "\\l";
|
||||
dotFile << " m: " << thjDelivery->msgsRcvdCount() << "\\l";
|
||||
dotFile << " b: " << thjDelivery->blockTouched() << "\\l";
|
||||
dotFile << " p: " << thjDelivery->phyIOCount() << "\\l";
|
||||
dotFile << "\"]" << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (typeid(*(qsi->get())) == typeid(CrossEngineStep))
|
||||
{
|
||||
outSize = dynamic_cast<CrossEngineStep*>(qsi->get())->getRows();
|
||||
dotFile << "0"
|
||||
<< " [label=" << qsi->get()->alias() << " shape=plaintext]" << endl;
|
||||
dotFile << stepidIn << " -> 0";
|
||||
dotFile << " [label=\" r: " << outSize << "\\l";
|
||||
dotFile << "\"]" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
for (psi = fProject.begin(), ctn = 0; psi != fProject.end(); ctn++, psi++)
|
||||
{
|
||||
tcn.column = "";
|
||||
uint16_t stepidIn = psi->get()->stepId();
|
||||
dotFile << stepidIn << " [label=\"st_" << stepidIn << " ";
|
||||
tcn = csc->colName(psi->get()->oid());
|
||||
dotFile << "(";
|
||||
BatchPrimitive* bps = 0;
|
||||
|
||||
if (typeid(*(psi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
bps = dynamic_cast<BatchPrimitive*>(psi->get());
|
||||
OIDVector projectOids = bps->getProjectOids();
|
||||
|
||||
for (unsigned int i = 0; i < projectOids.size(); i++)
|
||||
{
|
||||
tcn = csc->colName(projectOids[i]);
|
||||
|
||||
if (!tcn.column.empty())
|
||||
{
|
||||
dotFile << tcn.column;
|
||||
|
||||
if (i != (projectOids.size() - 1))
|
||||
dotFile << "/ ";
|
||||
}
|
||||
|
||||
if ((i + 1) % 3 == 0)
|
||||
dotFile << "\\l";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!tcn.column.empty())
|
||||
dotFile << tcn.column << "/";
|
||||
}
|
||||
|
||||
dotFile << JSTimeStamp::tsdiffstr(psi->get()->dlTimes.EndOfInputTime(),
|
||||
psi->get()->dlTimes.FirstReadTime())
|
||||
<< "s";
|
||||
dotFile << ")";
|
||||
|
||||
if (typeid(*(psi->get())) == typeid(pColStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=ellipse";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(pColScanStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(TupleBPS))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=box style=bold";
|
||||
|
||||
if (typeid(*(psi->get())) == typeid(TupleBPS))
|
||||
dotFile << " peripheries=2";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(pDictionaryStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=trapezium";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(PassThruStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=octagon";
|
||||
}
|
||||
else if (typeid(*(psi->get())) == typeid(FilterStep))
|
||||
{
|
||||
dotFile << "\""
|
||||
<< " shape=house orientation=180";
|
||||
}
|
||||
else
|
||||
dotFile << "\"";
|
||||
|
||||
dotFile << "]" << endl;
|
||||
|
||||
// msgsRecived, physicalIO, cacheIO
|
||||
msgs = psi->get()->msgsRcvdCount();
|
||||
pio = psi->get()->phyIOCount();
|
||||
|
||||
CalpontSystemCatalog::OID tableOIDProject = 0;
|
||||
|
||||
if (bps)
|
||||
tableOIDProject = bps->tableOid();
|
||||
|
||||
//@Bug 921
|
||||
for (dsi = fDeliveredTables.begin(); dsi != fDeliveredTables.end(); dsi++)
|
||||
{
|
||||
BatchPrimitive* dbps = dynamic_cast<BatchPrimitive*>((dsi->second).get());
|
||||
|
||||
if (dbps)
|
||||
{
|
||||
outSize = dbps->getRows();
|
||||
CalpontSystemCatalog::OID tableOID = dbps->tableOid();
|
||||
dotFile << tableOID << " [label=" << dbps->alias() << " shape=plaintext]" << endl;
|
||||
JobStepAssociation deliveryInputSA = dbps->inputAssociation();
|
||||
|
||||
if (tableOIDProject == tableOID)
|
||||
{
|
||||
dotFile << stepidIn << " -> " << tableOID;
|
||||
|
||||
dotFile << " [label=\" r: " << outSize << "\\l";
|
||||
dotFile << " m: " << dbps->msgsRcvdCount() << "\\l";
|
||||
dotFile << " b: " << dbps->blockTouched() << "\\l";
|
||||
dotFile << " p: " << dbps->phyIOCount() << "\\l";
|
||||
dotFile << "\"]" << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dotFile << "}" << std::endl;
|
||||
dotFile.close();
|
||||
dotFile << jlf_graphics::GraphGeneratorWStats(fQuery, fProject).writeDotCmds();
|
||||
}
|
||||
|
||||
void JobList::validate() const
|
||||
{
|
||||
// uint32_t i;
|
||||
// DeliveredTableMap::const_iterator it;
|
||||
|
||||
/* Make sure there's at least one query step and that they're the right type */
|
||||
idbassert(fQuery.size() > 0);
|
||||
// for (i = 0; i < fQuery.size(); i++)
|
||||
// idbassert(dynamic_cast<BatchPrimitiveStep *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<HashJoinStep *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<UnionStep *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<AggregateFilterStep *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<BucketReuseStep *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<pDictionaryScan *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<FilterStep *>(fQuery[i].get()) ||
|
||||
// dynamic_cast<OrDelimiter *>(fQuery[i].get())
|
||||
// );
|
||||
//
|
||||
// /* Make sure there's at least one projected table and that they're the right type */
|
||||
// idbassert(fDeliveredTables.size() > 0);
|
||||
// for (i = 0; i < fProject.size(); i++)
|
||||
// idbassert(dynamic_cast<BatchPrimitiveStep *>(fProject[i].get()));
|
||||
//
|
||||
// /* Check that all JobSteps use the right status pointer */
|
||||
// for (i = 0; i < fQuery.size(); i++) {
|
||||
// idbassert(fQuery[i]->errorInfo().get() == errorInfo().get());
|
||||
// }
|
||||
// for (i = 0; i < fProject.size(); i++) {
|
||||
// idbassert(fProject[i]->errorInfo().get() == errorInfo().get());
|
||||
// }
|
||||
// for (it = fDeliveredTables.begin(); it != fDeliveredTables.end(); ++it) {
|
||||
// idbassert(it->second->errorInfo().get() == errorInfo().get());
|
||||
// }
|
||||
}
|
||||
|
||||
void TupleJobList::validate() const
|
||||
@@ -1209,4 +696,3 @@ void TupleJobList::abort()
|
||||
#ifdef __clang__
|
||||
#pragma clang diagnostic pop
|
||||
#endif
|
||||
|
||||
|
@@ -2174,18 +2174,10 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm,
|
||||
}
|
||||
|
||||
oss << endl;
|
||||
gettimeofday(&stTime, 0);
|
||||
|
||||
struct tm tmbuf;
|
||||
localtime_r(&stTime.tv_sec, &tmbuf);
|
||||
ostringstream tms;
|
||||
tms << setfill('0') << setw(4) << (tmbuf.tm_year + 1900) << setw(2) << (tmbuf.tm_mon + 1) << setw(2)
|
||||
<< (tmbuf.tm_mday) << setw(2) << (tmbuf.tm_hour) << setw(2) << (tmbuf.tm_min) << setw(2)
|
||||
<< (tmbuf.tm_sec) << setw(6) << (stTime.tv_usec);
|
||||
string tmstr(tms.str());
|
||||
string jsrname("jobstep." + tmstr + ".dot");
|
||||
auto jsrname = jlf_graphics::generateDotFileName("jobstep.");
|
||||
ofstream dotFile(jsrname.c_str());
|
||||
jlf_graphics::writeDotCmds(dotFile, querySteps, projectSteps);
|
||||
dotFile << jlf_graphics::GraphGeneratorNoStats(querySteps, projectSteps).writeDotCmds();
|
||||
|
||||
char timestamp[80];
|
||||
ctime_r((const time_t*)&stTime.tv_sec, timestamp);
|
||||
|
@@ -147,6 +147,7 @@ SubAdapterStep::SubAdapterStep(SJSTEP& s, const JobInfo& jobInfo)
|
||||
, fOutputIterator(0)
|
||||
, fRunner(0)
|
||||
{
|
||||
fExtendedInfo = "SAS: ";
|
||||
fAlias = s->alias();
|
||||
fView = s->view();
|
||||
fInputJobStepAssociation = s->outputAssociation();
|
||||
|
Reference in New Issue
Block a user