1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Leonid Fedorov 3919c541ac
New warnfixes (#2254)
* Fix clang warnings

* Remove vim tab guides

* initialize variables

* 'strncpy' output truncated before terminating nul copying as many bytes from a string as its length

* Fix ISO C++17 does not allow 'register' storage class specifier for outdated bison

* chars are unsigned on ARM, having  if (ival < 0) always false

* chars are unsigned by default on ARM and comparison with -1 if always true
2022-02-17 13:08:58 +03:00

393 lines
8.8 KiB
C++

// $Id: sendplan.cpp 1739 2012-03-22 12:57:59Z pleblanc $
#include <iostream>
#include <fstream>
#include <sstream>
#include <unistd.h>
#include <sys/time.h>
#include <iomanip>
using namespace std;
#include "bytestream.h"
using namespace messageqcpp;
#include "calpontselectexecutionplan.h"
#include "sessionmanager.h"
using namespace execplan;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "tableband.h"
#include "joblist.h"
#include "joblistfactory.h"
using namespace joblist;
#include "configcpp.h"
#include "errorcodes.h"
#include "rowgroup.h"
using namespace rowgroup;
typedef CalpontSystemCatalog::OID OID;
namespace
{
bool vflg;
void usage()
{
cout << "usage: sendPlan [-v|h|d|B] [-t lvl] [-s sid] plan_file ..." << endl;
cout << "-v verbose output" << endl;
cout << "-t lvl set trace level to lvl" << endl;
cout << "-s sid set session id to sid" << endl;
cout << "-d display the query, but don't run it" << endl;
cout << "-B Bob's personal preferred output format" << endl;
cout << "-h display this help" << endl;
}
double tm_diff(const struct timeval* st, const struct timeval* et)
{
double sd = (double)st->tv_sec + (double)st->tv_usec / 1000000.0;
double ed = (double)et->tv_sec + (double)et->tv_usec / 1000000.0;
return (ed - sd);
}
//...Extract the stats string to be printed and the runtime start time from
//...the full stats string we receive from ExeMgr.
void parseStatsString(const string& fullStatsString, string& printStatsString,
struct timeval& queryRunStartTime)
{
string::size_type delimPos = fullStatsString.find('|');
printStatsString = fullStatsString.substr(0, delimPos);
istringstream startTimeString(fullStatsString.substr(delimPos + 1));
startTimeString >> queryRunStartTime.tv_sec >> queryRunStartTime.tv_usec;
}
} // namespace
int main(int argc, char** argv)
{
vflg = false;
uint32_t tlvl = 0;
bool dflg = false;
int c;
int32_t sid = -1;
bool Bflg = false;
opterr = 0;
while ((c = getopt(argc, argv, "vt:ds:Bh")) != EOF)
switch (c)
{
case 't': tlvl = static_cast<uint32_t>(strtoul(optarg, 0, 0)); break;
case 'v': vflg = true; break;
case 'd': dflg = true; break;
case 's': sid = static_cast<int32_t>(strtol(optarg, 0, 0)); break;
case 'B': Bflg = true; break;
case 'h':
case '?':
default:
usage();
return (c == 'h' ? 0 : 1);
break;
}
if (dflg)
vflg = true;
if ((argc - optind) < 1)
{
usage();
return 1;
}
ifstream inputf;
ByteStream bs;
ByteStream dbs;
ByteStream eoq;
ByteStream tbs;
ByteStream statsStream;
ByteStream::quadbyte q = 0;
eoq << q;
uint32_t sessionid;
time_t t;
SJLP jl;
DeliveredTableMap tm;
DeliveredTableMap::iterator iter;
DeliveredTableMap::iterator end;
CalpontSelectExecutionPlan csep;
struct timeval start_time;
struct timeval end_time;
MessageQueueClient* mqc = 0;
if (!dflg)
mqc = new MessageQueueClient("ExeMgr1");
if (sid == -1)
{
time(&t);
sessionid = static_cast<uint32_t>(t);
}
else
{
sessionid = static_cast<uint32_t>(sid);
}
sessionid &= 0x7fffffff;
logging::ErrorCodes errorCodes;
for (; optind < argc; optind++)
{
inputf.open(argv[optind]);
if (!inputf.good())
{
cerr << "error opening plan stream " << argv[optind] << endl;
return 1;
}
bs.reset();
inputf >> bs;
inputf.close();
csep.unserialize(bs);
csep.sessionID(sessionid);
SessionManager sm;
csep.verID(sm.verID());
csep.traceFlags(0);
ResourceManager* rm = ResourceManager::instance();
jl = JobListFactory::makeJobList(&csep, rm);
csep.traceFlags(tlvl);
if (vflg)
{
if (dflg)
cout << endl << "Query:" << endl;
else
{
cout << endl << "Session: " << sessionid << ", Sending Query";
if (Bflg)
cout << " (" << argv[optind] << ')';
cout << ':' << endl;
}
if (!Bflg)
cout << csep.data() << endl << endl;
}
if (dflg)
continue;
try
{
dbs.reset();
csep.serialize(dbs);
gettimeofday(&start_time, 0);
// try tuples first, but expect the worst...
bool expectTuples = false;
ByteStream tbs;
ByteStream::quadbyte tqb = 4;
tbs << tqb;
mqc->write(tbs);
// send the CSEP
mqc->write(dbs);
// read the response to the tuple request
tbs = mqc->read();
idbassert(tbs.length() == 4);
tbs >> tqb;
if (tqb == 4)
expectTuples = true;
if (!expectTuples)
cout << "Using TableBand I/F" << endl;
else
cout << "Using tuple I/F" << endl;
tm = jl->deliveredTables();
iter = tm.begin();
end = tm.end();
OID toid;
uint64_t rowTot;
bool reported = false;
bool needRGCtor = true;
while (iter != end)
{
toid = iter->first;
q = static_cast<ByteStream::quadbyte>(toid);
tbs.reset();
tbs << q;
mqc->write(tbs);
ByteStream tbbs;
TableBand tb;
RowGroup rg;
rowTot = 0;
uint16_t status = 0;
TableBand::VBA::size_type rc;
ofstream out;
for (;;)
{
tbbs = mqc->read();
#if 0
cout << tbbs.length() << endl;
out.open("bs1.dat");
idbassert(out.good());
out << tbbs;
out.close();
tbbs = mqc->read();
cout << tbbs.length() << endl;
out.open("bs2.dat");
idbassert(out.good());
out << tbbs;
out.close();
tbbs = mqc->read();
cout << tbbs.length() << endl;
out.open("bs3.dat");
idbassert(out.good());
out << tbbs;
out.close();
#endif
if (tbbs.length())
{
if (!expectTuples)
tb.unserialize(tbbs);
else
{
if (needRGCtor)
{
rg.deserialize(tbbs);
needRGCtor = false;
tbbs = mqc->read();
}
rg.setData((uint8_t*)tbbs.buf());
}
}
else
{
//@bug 1346
if (!status)
status = logging::makeJobListErr;
break;
}
if (!expectTuples)
{
rc = tb.getRowCount();
status = tb.getStatus();
}
else
{
rc = rg.getRowCount();
status = rg.getStatus();
if (rc == 0)
status = 0;
}
if (rc == 0)
break;
rowTot += rc;
}
BatchPrimitive* step = dynamic_cast<BatchPrimitive*>(iter->second.get());
if (vflg && step)
{
cout << "For table " << step->tableName();
if (!Bflg)
cout << " " << toid;
cout << ": read " << rowTot << " rows" << endl;
}
if (status && !reported)
{
cout << "### Query failed: " << errorCodes.errorString(status) << " Check crit.log\n";
reported = true;
}
if (!step && !reported)
{
cout << "### Query failed: Did not return project BatchPrimitive. Check crit.log\n";
reported = true;
}
++iter;
}
if (vflg)
{
gettimeofday(&end_time, 0);
cout << "Query time: " << fixed << setprecision(1) << tm_diff(&start_time, &end_time) << " secs"
<< endl;
//...Ask for query stats through special table id of 3
const OID TABLE_ID_TO_GET_QUERY_STATS = 3;
if (!Bflg)
cout << "Retrieving stats..." << endl;
toid = TABLE_ID_TO_GET_QUERY_STATS;
q = static_cast<ByteStream::quadbyte>(toid);
statsStream.reset();
statsStream << q;
mqc->write(statsStream);
ByteStream bs_statsString;
bs_statsString = mqc->read();
string statsString;
bs_statsString >> statsString;
string printStatsString;
struct timeval startRunTime;
parseStatsString(statsString, printStatsString, startRunTime);
cout << printStatsString << "; QuerySetupTime-" << tm_diff(&start_time, &startRunTime) << "secs"
<< endl;
}
//...Close this query/session
mqc->write(eoq);
jl.reset();
}
catch (const exception& ex)
{
cout << "### SendPlan caught an exception: " << ex.what() << endl;
}
}
// jl.reset();
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionid);
config::Config::deleteInstanceMap();
delete mqc;
return 0;
}