1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2022-01-21 16:43:49 +00:00

1525 lines
39 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <string>
#include <stdexcept>
#include <typeinfo>
using namespace std;
#include <cppunit/extensions/HelperMacros.h>
#include <sstream>
#include <exception>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <pthread.h>
#include <values.h>
#include <signal.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include "configcpp.h"
using namespace config;
#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;
#include "calpontselectexecutionplan.h"
#include "simplefilter.h"
#include "simplecolumn.h"
#include "expressionparser.h"
#include "constantcolumn.h"
#include "treenode.h"
#include "operator.h"
#include "arithmeticcolumn.h"
#include "aggregatecolumn.h"
#include "existsfilter.h"
#include "functioncolumn.h"
#include "selectfilter.h"
#include "objectreader.h"
#include "objectidmanager.h"
#include "sessionmanager.h"
#include "sessionmonitor.h"
#include "treenodeimpl.h"
using namespace execplan;
const char* OIDBitmapFilename = NULL;
int maxNewTxns = 1000;
class ExecPlanTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(ExecPlanTest);
CPPUNIT_TEST(selectExecutionPlan_1);
CPPUNIT_TEST(selectExecutionPlan_2);
CPPUNIT_TEST(expressionParser_1);
CPPUNIT_TEST(expressionParser_2);
CPPUNIT_TEST(aggregatecolumn_1);
CPPUNIT_TEST(arithmeticExpression_1);
CPPUNIT_TEST(arithmeticExpression_2);
CPPUNIT_TEST(arithmeticExpression_3);
CPPUNIT_TEST(copyTree);
CPPUNIT_TEST(treeNodeImpl);
CPPUNIT_TEST(serializeSimpleColumn);
CPPUNIT_TEST(serializeAggregateColumn);
CPPUNIT_TEST(serializeOperator);
CPPUNIT_TEST(serializeFilter);
CPPUNIT_TEST(serializeFunctionColumn);
CPPUNIT_TEST(serializeConstantColumn);
CPPUNIT_TEST(serializeParseTree);
CPPUNIT_TEST(serializeArithmeticColumn);
CPPUNIT_TEST(serializeSimpleFilter);
CPPUNIT_TEST(serializeCSEP);
CPPUNIT_TEST(serializeSelectFilter);
CPPUNIT_TEST(serializeExistsFilter);
CPPUNIT_TEST(sessionManager_1);
CPPUNIT_TEST(sessionManager_2);
CPPUNIT_TEST(sessionManager_3);
CPPUNIT_TEST(sessionManager_4);
unlink("/tmp/CalpontSessionMonitorShm");
// CPPUNIT_TEST( MonitorTestPlan_1 );
// CPPUNIT_TEST( MonitorTestPlan_1 );
unlink("/tmp/CalpontSessionMonitorShm");
CPPUNIT_TEST(objectIDManager_1);
if (OIDBitmapFilename != NULL)
unlink(OIDBitmapFilename);
CPPUNIT_TEST_SUITE_END();
private:
boost::shared_ptr<CalpontSystemCatalog> csc;
public:
static void walkfnString(const ParseTree* n)
{
cout << *(n->data()) << endl;
}
static void walkfnInt(const ExpressionTree<int>* n)
{
cout << n->data() << endl;
}
void setUp()
{
csc = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
csc->identity(CalpontSystemCatalog::FE);
}
void tearDown()
{
}
void selectExecutionPlan_1()
{
cout << "SQL: select region.r_regionkey from region, nation where nation.n_regionkey = "
"region.r_regionkey and nation.n_regionkey != 3;"
<< endl;
CalpontSelectExecutionPlan csep;
CPPUNIT_ASSERT(csep.location() == CalpontSelectExecutionPlan::MAIN);
CPPUNIT_ASSERT(csep.dependent() == false);
CPPUNIT_ASSERT(csep.subSelects().size() == 0);
// returned columns
CalpontSelectExecutionPlan::ReturnedColumnList colList;
SimpleColumn* sc = new SimpleColumn("tpch.region.r_regionkey", 0);
colList.push_back(sc);
ArithmeticColumn* ac = new ArithmeticColumn("a+sum(r_regionkey)", 0);
colList.push_back(ac);
csep.returnedCols(colList);
CPPUNIT_ASSERT(csep.returnedCols().size() == 2);
// filters
CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
SimpleFilter* sf = new SimpleFilter();
SimpleColumn* lhs = new SimpleColumn();
*lhs = *sc;
SimpleColumn* rhs = new SimpleColumn("tpch.nation.n_regionkey", 0);
CPPUNIT_ASSERT(*lhs == *sc);
CPPUNIT_ASSERT(*rhs != *lhs);
Operator* op = new Operator("=");
sf->op(op);
sf->lhs(lhs);
sf->rhs(rhs);
filterTokenList.push_back(sf);
filterTokenList.push_back(new Operator("And"));
SimpleFilter* sf1 = new SimpleFilter(new Operator("="), sc->clone(), ac->clone());
filterTokenList.push_back(sf1);
csep.filterTokenList(filterTokenList);
ParseTree* filterList = const_cast<ParseTree*>(csep.filters());
// draw filterList tree
filterList->drawTree("selectExecutionPlan_1.dot");
csep.filters(filterList);
// Group by
CalpontSelectExecutionPlan::GroupByColumnList groupByList;
groupByList.push_back(sc->clone());
csep.groupByCols(groupByList);
CPPUNIT_ASSERT(csep.groupByCols().size() == 1);
// Having
CalpontSelectExecutionPlan::FilterTokenList havingTokenList;
SimpleFilter* having =
new SimpleFilter(new Operator("="), new ArithmeticColumn("sum(volumn)", 0), new ConstantColumn(8));
havingTokenList.push_back(having);
csep.havingTokenList(havingTokenList);
CPPUNIT_ASSERT(*sf1 != *having);
CPPUNIT_ASSERT(csep.havingTokenList().size() == 1);
// Order by
CalpontSelectExecutionPlan::OrderByColumnList orderByList;
ArithmeticColumn* o1 = new ArithmeticColumn(*ac);
o1->asc(false);
orderByList.push_back(o1);
csep.orderByCols(orderByList);
CPPUNIT_ASSERT(csep.orderByCols().size() == 1);
// another csep
CalpontSelectExecutionPlan* newcsep = new CalpontSelectExecutionPlan(CalpontSelectExecutionPlan::FROM);
CalpontSelectExecutionPlan::ReturnedColumnList ncolList;
SimpleColumn* newsc = new SimpleColumn("tpch.region.r_regionkey", 0);
ncolList.push_back(newsc);
newcsep->returnedCols(ncolList);
CalpontSelectExecutionPlan::FilterTokenList nfilterTokenList;
SimpleFilter* newsf = new SimpleFilter(new Operator(">"), sc->clone(), newsc->clone());
nfilterTokenList.push_back(newsf);
newcsep->filterTokenList(nfilterTokenList);
CalpontSelectExecutionPlan::FilterTokenList nhavingTokenList;
SimpleFilter* newhaving = new SimpleFilter(new Operator(">"), sc->clone(), newsc->clone());
CPPUNIT_ASSERT(*newsf == *newhaving);
nhavingTokenList.push_back(newhaving);
newcsep->havingTokenList(nhavingTokenList);
CPPUNIT_ASSERT(*newcsep != csep);
CPPUNIT_ASSERT(*newcsep->filters() == *newcsep->having());
ByteStream b;
csep.serialize(b);
newcsep->unserialize(b);
CPPUNIT_ASSERT(csep == *newcsep);
CalpontSelectExecutionPlan::SelectList selectList;
selectList.push_back(newcsep);
csep.subSelects(selectList);
cout << "\nCalpont Execution Plan:" << endl;
cout << csep;
cout << " --- end of test 1 ---" << endl;
}
void selectExecutionPlan_2()
{
CalpontSelectExecutionPlan cep;
// select filter
CalpontSelectExecutionPlan* subselect = new CalpontSelectExecutionPlan;
subselect->location(CalpontSelectExecutionPlan::WHERE);
subselect->dependent(false);
CPPUNIT_ASSERT(subselect->location() == CalpontSelectExecutionPlan::WHERE);
CPPUNIT_ASSERT(subselect->dependent() == false);
ByteStream selb;
subselect->serialize(selb);
CalpontSelectExecutionPlan* subselect1 = new CalpontSelectExecutionPlan;
subselect->location(CalpontSelectExecutionPlan::WHERE);
subselect1->unserialize(selb);
SelectFilter* sef =
new SelectFilter(new SimpleColumn("tpch.region.r_regionkey", 0), new Operator(">="), subselect);
cout << *sef;
ByteStream b;
sef->serialize(b);
SelectFilter* newsef =
new SelectFilter(new SimpleColumn("tpch.nation.n_regionke", 0), new Operator("<="), subselect1);
newsef->unserialize(b);
CPPUNIT_ASSERT(*sef == *newsef);
delete sef;
delete newsef;
// simple filter
Filter* sf = new SimpleFilter(new Operator("="), new SimpleColumn("tpch.nation.n_regionke", 0),
new SimpleColumn("tpch.region.r_regionkey", 0));
cout << *sf;
ByteStream sfb;
SimpleFilter* sf2 = new SimpleFilter();
sf2->serialize(sfb);
sf->unserialize(sfb);
CPPUNIT_ASSERT(*sf == *sf2);
delete sf2;
delete sf;
// exist filter
CalpontSelectExecutionPlan* cep1 = new CalpontSelectExecutionPlan();
ExistsFilter* filter = new ExistsFilter();
delete filter;
filter = new ExistsFilter(cep1);
filter->exists(cep1);
const CalpontSelectExecutionPlan* cep2 = filter->exists();
cout << *filter;
CPPUNIT_ASSERT(*cep1 == *cep2);
CalpontSelectExecutionPlan::Parser parser;
std::vector<Token> tokens;
Token t;
t.value = filter;
tokens.push_back(t);
cep.filters(parser.parse(tokens.begin(), tokens.end()));
cout << cep;
cout << " --- end of test 2 ---" << endl;
}
void expressionParser_1()
{
cout << "\nPost order of int expression tree(pointer):" << endl;
ExpressionTree<int>* et = new ExpressionTree<int>(3);
ExpressionTree<int>* et1 = new ExpressionTree<int>(5);
ExpressionTree<int>* et2 = new ExpressionTree<int>(6);
et->left(et1);
et->right(et2);
et->walk(walkfnInt);
et->destroyTree(et);
cout << " --- end of test 3 ---" << endl;
}
void expressionParser_2()
{
cout << "\nPost order of int expression tree (reference):" << endl;
ExpressionTree<int> et(3);
ExpressionTree<int> et1(5);
ExpressionTree<int> et2(6);
et.left(&et1);
et.right(&et2);
et.walk(walkfnInt);
cout << " --- end of test 4 ---" << endl;
}
void aggregatecolumn_1()
{
cout << "\naggregate column: " << endl;
AggregateColumn a;
cout << a;
cout << "\nsum(p_type)" << endl;
AggregateColumn d("sum", "p_type");
cout << d;
a.functionName("avg");
ArithmeticColumn* b = new ArithmeticColumn("a-(b-c)");
a.functionParms(b);
CPPUNIT_ASSERT(a.functionName() == "avg");
cout << *const_cast<ReturnedColumn*>(a.functionParms());
cout << " --- end of test 5 ---" << endl;
}
void arithmeticExpression_1()
{
cout << "\narithmetic expression: " << endl;
string exp(
"substr(a)+ 100.00 * sum(tpch.part.p_type) / sum(tpch.lineitem.l_extendedprice "
"*(1-tpch.lineitem.l_discount))");
cout << exp << endl;
ArithmeticColumn a(exp, 0);
ParseTree* pt = const_cast<ParseTree*>(a.expression());
if (pt != NULL)
{
pt->walk(walkfnString);
pt->drawTree("arithmeticExpression_1.dot");
}
cout << " --- end of test 6 ---" << endl;
}
void copyTree()
{
// cout << "\narithmetic expression: " << endl;
string exp(
"substr(a)+ 100.00 * sum(tpch.part.p_type) / sum(tpch.lineitem.l_extendedprice "
"*(1-tpch.lineitem.l_discount))");
ArithmeticColumn a(exp);
ParseTree* pt = const_cast<ParseTree*>(a.expression());
ParseTree* newTree = new ParseTree();
// copy 1st time
newTree->copyTree(*pt);
// copy 2nd time, see if the existing tree is deleted.
newTree->copyTree(*pt);
// explicitly delete the 2nd copied tree
delete newTree;
}
void arithmeticExpression_2()
{
cout << "\nbuild arithmetic column by using accessing methods:" << endl;
CalpontSelectExecutionPlan::Parser parser;
std::vector<Token> tokens;
Token t;
ArithmeticColumn b;
cout << b;
ConstantColumn* c1 = new ConstantColumn();
c1->constval("'ASIA'");
c1->type(ConstantColumn::LITERAL);
// CPPUNIT_ASSERT (c1->data() == "'ASIA'(l)");
t.value = c1;
tokens.push_back(t);
t.value = new Operator("/");
tokens.push_back(t);
ConstantColumn* c2 = new ConstantColumn(5);
CPPUNIT_ASSERT(c2->type() == ConstantColumn::NUM);
t.value = c2;
tokens.push_back(t);
ParseTree* tree = parser.parse(tokens.begin(), tokens.end());
b.expression(tree);
cout << b;
cout << " --- end of test 7 ---" << endl;
}
void arithmeticExpression_3()
{
// invalid expression test
try
{
ArithmeticColumn a("-a+b", 0);
ArithmeticColumn d("a* b-", 0);
}
catch (const runtime_error& e)
{
cerr << e.what() << endl;
}
try
{
ArithmeticColumn e("a+substr (c from 1 4", 0);
}
catch (const runtime_error& e)
{
cerr << e.what() << endl;
}
try
{
ArithmeticColumn f("a + b c", 0);
}
catch (const runtime_error& e)
{
cerr << e.what() << endl;
}
try
{
ArithmeticColumn b("a + ((b+ c -e)", 0);
}
catch (const runtime_error& e)
{
cerr << e.what() << endl;
}
try
{
ArithmeticColumn g("a ++ b", 0); // valid
}
catch (const runtime_error& e)
{
cerr << e.what() << endl;
}
}
void treeNodeImpl()
{
TreeNodeImpl* node1 = new TreeNodeImpl();
TreeNodeImpl* node2 = new TreeNodeImpl("node2");
CPPUNIT_ASSERT(node2->data() == "node2");
CPPUNIT_ASSERT(*node1 != *node2);
ByteStream b;
node2->serialize(b);
node1->unserialize(b);
CPPUNIT_ASSERT(*node1 == *node2);
node2->data("node3");
cout << *node2;
TreeNodeImpl* node4 = node2->clone();
CPPUNIT_ASSERT(*node2 == node4);
delete node1;
delete node2;
delete node4;
}
void serializeSimpleColumn()
{
SimpleColumn s1, s2;
TreeNode* t;
ByteStream b;
t = &s2;
CPPUNIT_ASSERT(s1 == s2);
CPPUNIT_ASSERT(!(s1 != s2));
CPPUNIT_ASSERT(s1 == t);
CPPUNIT_ASSERT(!(s1 != t));
s1.schemaName("Schema Name 1");
s1.tableName("Table Name 1");
s1.columnName("Column Name 1");
// s1.tcn(5);
s1.data("sc1");
s1.serialize(b);
CPPUNIT_ASSERT(s2.schemaName() == "");
CPPUNIT_ASSERT(s2.tableName() == "");
CPPUNIT_ASSERT(s2.columnName() == "");
// CPPUNIT_ASSERT(s2.tcn() == 0);
CPPUNIT_ASSERT(s1 != s2);
CPPUNIT_ASSERT(s1 == s1);
CPPUNIT_ASSERT(s1 != t);
CPPUNIT_ASSERT(!(s1 == t));
s2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(s2.schemaName() == "Schema Name 1");
CPPUNIT_ASSERT(s2.tableName() == "Table Name 1");
CPPUNIT_ASSERT(s2.columnName() == "Column Name 1");
// CPPUNIT_ASSERT(s2.tcn() == 5);
CPPUNIT_ASSERT(s2.data() == "sc1");
CPPUNIT_ASSERT(s1 == s2);
CPPUNIT_ASSERT(!(s1 != s2));
CPPUNIT_ASSERT(s1 == t);
CPPUNIT_ASSERT(!(s1 != t));
}
void serializeAggregateColumn()
{
AggregateColumn a1, a2;
SimpleColumn* s1;
TreeNode* t;
ByteStream b;
t = &a2;
s1 = new SimpleColumn();
CPPUNIT_ASSERT(a1 == a2);
CPPUNIT_ASSERT(!(a1 != a2));
CPPUNIT_ASSERT(a1 == t);
CPPUNIT_ASSERT(!(a1 != t));
a1.functionName("AggregateColumn test");
a1.data("agg1");
a1.functionParms(s1);
a1.serialize(b);
CPPUNIT_ASSERT(a2.functionName() == "");
CPPUNIT_ASSERT(a2.data() == "");
CPPUNIT_ASSERT(a2.functionParms() == NULL);
CPPUNIT_ASSERT(a1 != a2);
CPPUNIT_ASSERT(!(a1 == a2));
CPPUNIT_ASSERT(a1 != t);
CPPUNIT_ASSERT(!(a1 == t));
a2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(a2.functionName() == "AggregateColumn test");
CPPUNIT_ASSERT(a2.data() == "agg1");
CPPUNIT_ASSERT(a2.functionParms() != NULL);
CPPUNIT_ASSERT(*(a2.functionParms()) == s1);
CPPUNIT_ASSERT(a1 == a2);
CPPUNIT_ASSERT(!(a1 != a2));
CPPUNIT_ASSERT(a1 == t);
CPPUNIT_ASSERT(!(a1 != t));
}
void serializeOperator()
{
Operator o1, o2;
TreeNode* t;
ByteStream b;
t = &o2;
CPPUNIT_ASSERT(o1 == o2);
CPPUNIT_ASSERT(!(o1 != o2));
CPPUNIT_ASSERT(o1 == t);
CPPUNIT_ASSERT(!(o1 != t));
o1.data("=");
CPPUNIT_ASSERT(o1 != o2);
CPPUNIT_ASSERT(!(o1 == o2));
CPPUNIT_ASSERT(o1 != t);
CPPUNIT_ASSERT(!(o1 == t));
o1.serialize(b);
CPPUNIT_ASSERT(o2.data() == "");
o2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(o2.data() == "=");
CPPUNIT_ASSERT(o1 == o2);
CPPUNIT_ASSERT(!(o1 != o2));
CPPUNIT_ASSERT(o1 == t);
CPPUNIT_ASSERT(!(o1 != t));
}
void serializeFilter()
{
Filter f1, f2;
TreeNode* t;
ByteStream b;
t = &f2;
CPPUNIT_ASSERT(f1 == f2);
CPPUNIT_ASSERT(!(f1 != f2));
CPPUNIT_ASSERT(f1 == t);
CPPUNIT_ASSERT(!(f1 != t));
f1.data("Filter test");
CPPUNIT_ASSERT(f1 != f2);
CPPUNIT_ASSERT(!(f1 == f2));
CPPUNIT_ASSERT(f1 != t);
CPPUNIT_ASSERT(!(f1 == t));
f1.serialize(b);
CPPUNIT_ASSERT(f2.data() == "");
f2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(f2.data() == "Filter test");
CPPUNIT_ASSERT(f1 == f2);
CPPUNIT_ASSERT(!(f1 != f2));
CPPUNIT_ASSERT(f1 == t);
CPPUNIT_ASSERT(!(f1 != t));
}
void serializeFunctionColumn()
{
FunctionColumn fc1, fc2;
TreeNode* t;
ByteStream b;
t = &fc2;
CPPUNIT_ASSERT(fc1 == fc2);
CPPUNIT_ASSERT(!(fc1 != fc2));
CPPUNIT_ASSERT(fc1 == t);
CPPUNIT_ASSERT(!(fc1 != t));
/* FunctionColumn */
fc1.sessionID(0);
fc1.functionName("FunctionColumn test");
fc1.functionParms("tpch.region.r_regionkey, tpch.nation.n_nationkey");
fc1.data("fc1");
CPPUNIT_ASSERT(fc1 != fc2);
CPPUNIT_ASSERT(!(fc1 == fc2));
CPPUNIT_ASSERT(fc1 != t);
CPPUNIT_ASSERT(!(fc1 == t));
FunctionColumn::FunctionParm functionParms;
functionParms = fc1.functionParms();
CPPUNIT_ASSERT(functionParms.size() == 2);
fc1.serialize(b);
CPPUNIT_ASSERT(fc2.functionName() == "");
CPPUNIT_ASSERT(fc2.functionParms().size() == 0);
fc2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(fc2.functionName() == "FunctionColumn test");
CPPUNIT_ASSERT(fc2.functionParms().size() == 2);
functionParms = fc2.functionParms();
CPPUNIT_ASSERT(functionParms.size() == 2);
CPPUNIT_ASSERT(fc1 == fc2);
CPPUNIT_ASSERT(!(fc1 != fc2));
CPPUNIT_ASSERT(fc1 == t);
CPPUNIT_ASSERT(!(fc1 != t));
}
void serializeConstantColumn()
{
ConstantColumn c1, c2;
TreeNode* t;
ByteStream b;
t = &c2;
CPPUNIT_ASSERT(c1 == c2);
CPPUNIT_ASSERT(!(c1 != c2));
CPPUNIT_ASSERT(c1 == t);
CPPUNIT_ASSERT(!(c1 != t));
c1.type(5);
c1.constval("ConstantColumn test");
c1.data("c1");
CPPUNIT_ASSERT(c1 != c2);
CPPUNIT_ASSERT(!(c1 == c2));
CPPUNIT_ASSERT(c1 != t);
CPPUNIT_ASSERT(!(c1 == t));
c1.serialize(b);
CPPUNIT_ASSERT(c2.constval() == "");
c2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(c2.type() == 5);
CPPUNIT_ASSERT(c2.constval() == "ConstantColumn test");
CPPUNIT_ASSERT(c1 == c2);
CPPUNIT_ASSERT(!(c1 != c2));
CPPUNIT_ASSERT(c1 == t);
CPPUNIT_ASSERT(!(c1 != t));
}
ParseTree* makeParseTree()
{
ParseTree* t[5];
SimpleColumn* s[5];
int i;
/* ParseTree (ExpressionTree<TreeNode*>) */
/*
t0(s0)
/ \
s1 s2
/ \
s3 s4
*/
for (i = 0; i < 5; i++)
{
t[i] = new ParseTree(NULL);
s[i] = new SimpleColumn();
t[i]->data(s[i]);
}
s[0]->schemaName("Schema Name 0");
s[1]->schemaName("Schema Name 1");
s[2]->schemaName("Schema Name 2");
s[3]->schemaName("Schema Name 3");
s[4]->schemaName("Schema Name 4");
t[0]->left(t[1]);
t[0]->right(t[2]);
t[1]->left(t[3]);
t[2]->right(t[4]);
return t[0];
}
void verifyParseTree(const ParseTree* t)
{
const ParseTree *ct, *ct2;
SimpleColumn* s;
CPPUNIT_ASSERT(t != NULL);
s = dynamic_cast<SimpleColumn*>(t->data());
CPPUNIT_ASSERT(s != NULL);
CPPUNIT_ASSERT(s->schemaName() == "Schema Name 0");
ct = t->left();
CPPUNIT_ASSERT(ct != NULL);
s = dynamic_cast<SimpleColumn*>(ct->data());
CPPUNIT_ASSERT(s != NULL);
CPPUNIT_ASSERT(s->schemaName() == "Schema Name 1");
ct2 = ct->left();
CPPUNIT_ASSERT(ct2 != NULL);
s = dynamic_cast<SimpleColumn*>(ct2->data());
CPPUNIT_ASSERT(s != NULL);
CPPUNIT_ASSERT(s->schemaName() == "Schema Name 3");
CPPUNIT_ASSERT(ct->right() == NULL);
CPPUNIT_ASSERT(ct2->left() == NULL);
CPPUNIT_ASSERT(ct2->right() == NULL);
ct = t->right();
CPPUNIT_ASSERT(ct != NULL);
s = dynamic_cast<SimpleColumn*>(ct->data());
CPPUNIT_ASSERT(s != NULL);
CPPUNIT_ASSERT(s->schemaName() == "Schema Name 2");
ct2 = ct->right();
CPPUNIT_ASSERT(ct2 != NULL);
s = dynamic_cast<SimpleColumn*>(ct2->data());
CPPUNIT_ASSERT(s != NULL);
CPPUNIT_ASSERT(s->schemaName() == "Schema Name 4");
CPPUNIT_ASSERT(ct->left() == NULL);
CPPUNIT_ASSERT(ct2->left() == NULL);
CPPUNIT_ASSERT(ct2->right() == NULL);
}
void serializeParseTree()
{
ByteStream b;
ParseTree *t, *tmodel;
t = makeParseTree();
tmodel = makeParseTree();
verifyParseTree(t); // sanity check on the test itself
CPPUNIT_ASSERT(*t == *tmodel);
CPPUNIT_ASSERT(!(*t != *tmodel));
ObjectReader::writeParseTree(t, b);
delete t;
t = ObjectReader::createParseTree(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(t != NULL);
verifyParseTree(t);
CPPUNIT_ASSERT(*t == *tmodel);
CPPUNIT_ASSERT(!(*t != *tmodel));
delete t;
delete tmodel;
}
ArithmeticColumn* makeArithmeticColumn()
{
ArithmeticColumn* ret;
ParseTree* t;
t = makeParseTree();
ret = new ArithmeticColumn();
ret->expression(t);
ret->alias("ArithmeticColumn");
ret->data("AD");
return ret;
}
void serializeArithmeticColumn()
{
ParseTree* t;
const ParseTree* ct;
ArithmeticColumn ac, ac2;
TreeNode* tn;
ByteStream b;
tn = &ac2;
CPPUNIT_ASSERT(ac == ac2);
CPPUNIT_ASSERT(!(ac != ac2));
CPPUNIT_ASSERT(ac == tn);
CPPUNIT_ASSERT(!(ac != tn));
t = makeParseTree();
ac.expression(t);
ac.alias("ArithmeticColumn");
ac.data("AD");
CPPUNIT_ASSERT(ac != ac2);
CPPUNIT_ASSERT(!(ac == ac2));
CPPUNIT_ASSERT(ac != tn);
CPPUNIT_ASSERT(!(ac == tn));
ac.serialize(b);
ac2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
ct = ac2.expression();
verifyParseTree(ct);
CPPUNIT_ASSERT(ac2.alias() == "ArithmeticColumn");
CPPUNIT_ASSERT(ac2.data() == "AD");
CPPUNIT_ASSERT(ac == ac2);
CPPUNIT_ASSERT(!(ac != ac2));
CPPUNIT_ASSERT(ac == tn);
CPPUNIT_ASSERT(!(ac != tn));
}
void serializeSimpleFilter()
{
ArithmeticColumn *pac1, *pac2;
const ArithmeticColumn *cpac1, *cpac2;
SimpleFilter sf1, sf2;
Operator* o1;
const Operator* co2;
TreeNode* t;
ByteStream b;
t = &sf2;
CPPUNIT_ASSERT(sf1 == sf2);
CPPUNIT_ASSERT(!(sf1 != sf2));
CPPUNIT_ASSERT(sf1 == t);
CPPUNIT_ASSERT(!(sf1 != t));
pac1 = makeArithmeticColumn();
pac2 = makeArithmeticColumn();
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(pac1 != NULL);
CPPUNIT_ASSERT(pac2 != NULL);
o1 = new Operator("=");
sf1.lhs(pac1);
sf1.rhs(pac2);
sf1.op(o1);
sf1.serialize(b);
CPPUNIT_ASSERT(sf1 != sf2);
CPPUNIT_ASSERT(!(sf1 == sf2));
CPPUNIT_ASSERT(sf1 != t);
CPPUNIT_ASSERT(!(sf1 == t));
sf2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
cpac1 = dynamic_cast<const ArithmeticColumn*>(sf2.lhs());
CPPUNIT_ASSERT(cpac1 != NULL);
verifyParseTree(cpac1->expression());
cpac2 = dynamic_cast<const ArithmeticColumn*>(sf2.rhs());
CPPUNIT_ASSERT(cpac2 != NULL);
verifyParseTree(cpac2->expression());
co2 = sf2.op();
CPPUNIT_ASSERT(co2 != NULL);
CPPUNIT_ASSERT(co2->data() == o1->data());
CPPUNIT_ASSERT(sf1 == sf2);
CPPUNIT_ASSERT(!(sf1 != sf2));
CPPUNIT_ASSERT(sf1 == t);
CPPUNIT_ASSERT(!(sf1 != t));
}
void serializeCSEP()
{
/*
* CalpontSelectExecutionPlan
* This is a large class; it makes more sense to write == operators
* for everything than to write a giant equivalance test here.
* For now this is mostly a regression test.
*/
CalpontSelectExecutionPlan csep1, csep2;
CalpontSelectExecutionPlan::ReturnedColumnList colList;
ParseTree* filterList;
CalpontExecutionPlan* cep;
ByteStream b;
cep = &csep2;
CPPUNIT_ASSERT(csep1 == csep2);
CPPUNIT_ASSERT(!(csep1 != csep2));
CPPUNIT_ASSERT(csep1 == cep);
CPPUNIT_ASSERT(!(csep1 != cep));
// returned columns
SimpleColumn* sc = new SimpleColumn("tpch.region.r_regionkey");
colList.push_back(sc);
// filters
CalpontSelectExecutionPlan::Parser parser;
std::vector<Token> tokens;
Token t;
SimpleFilter* sf = new SimpleFilter();
SimpleColumn* lhs = new SimpleColumn(*sc);
SimpleColumn* rhs = new SimpleColumn("tpch.nation.n_regionkey");
Operator* op = new Operator("=");
sf->op(op);
sf->lhs(lhs);
sf->rhs(rhs);
t.value = sf;
tokens.push_back(t);
Operator* op1 = new Operator("and");
t.value = op1;
tokens.push_back(t);
SimpleFilter* sf1 = new SimpleFilter();
SimpleColumn* lhs1 = new SimpleColumn(*rhs);
ConstantColumn* constCol = new ConstantColumn("3", ConstantColumn::NUM);
Operator* op2 = new Operator("!=");
sf1->op(op2);
sf1->lhs(lhs1);
sf1->rhs(constCol);
t.value = sf1;
tokens.push_back(t);
filterList = parser.parse(tokens.begin(), tokens.end());
// draw filterList tree
filterList->drawTree("selectExecutionPlan_1.dot");
// calpont execution plan
csep1.returnedCols(colList);
csep1.filters(filterList);
CPPUNIT_ASSERT(csep1 != csep2);
CPPUNIT_ASSERT(!(csep1 == csep2));
CPPUNIT_ASSERT(csep1 != cep);
CPPUNIT_ASSERT(!(csep1 == cep));
csep1.serialize(b);
csep2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(csep1 == csep2);
CPPUNIT_ASSERT(!(csep1 != csep2));
CPPUNIT_ASSERT(csep1 == cep);
CPPUNIT_ASSERT(!(csep1 != cep));
CalpontSelectExecutionPlan csep3, csep4;
// subselect
CalpontSelectExecutionPlan* subselect = new CalpontSelectExecutionPlan;
subselect->location(CalpontSelectExecutionPlan::WHERE);
subselect->dependent(false);
CPPUNIT_ASSERT(subselect->location() == CalpontSelectExecutionPlan::WHERE);
CPPUNIT_ASSERT(subselect->dependent() == false);
CalpontSelectExecutionPlan::SelectList selectList;
selectList.push_back(subselect);
csep3.subSelects(selectList);
// exist filter
CalpontSelectExecutionPlan* cep1 = new CalpontSelectExecutionPlan();
ExistsFilter* filter = new ExistsFilter();
delete filter;
filter = new ExistsFilter(cep1);
filter->exists(cep1);
// CalpontSelectExecutionPlan* cep2 = const_cast<CalpontSelectExecutionPlan*>(filter->exists());
CalpontSelectExecutionPlan::Parser parser1;
std::vector<Token> tokens1;
Token t1;
t1.value = filter;
tokens1.push_back(t1);
csep3.filters(parser1.parse(tokens1.begin(), tokens1.end()));
csep3.serialize(b);
csep4.unserialize(b);
CPPUNIT_ASSERT(csep3 == csep4);
CPPUNIT_ASSERT(!(csep3 != csep4));
}
void serializeSelectFilter()
{
ByteStream b;
ArithmeticColumn* pac1;
Operator* o1;
const Operator* co2;
CalpontSelectExecutionPlan csep1;
SelectFilter sel1, sel2;
const ArithmeticColumn* cpac1;
const ParseTree* ct;
TreeNode* t;
t = &sel2;
CPPUNIT_ASSERT(sel1 == sel2);
CPPUNIT_ASSERT(!(sel1 != sel2));
CPPUNIT_ASSERT(sel1 == t);
CPPUNIT_ASSERT(!(sel1 != t));
pac1 = makeArithmeticColumn();
o1 = new Operator("=");
sel1.lhs(pac1);
sel1.op(o1);
CPPUNIT_ASSERT(sel1 != sel2);
CPPUNIT_ASSERT(!(sel1 == sel2));
CPPUNIT_ASSERT(sel1 != t);
CPPUNIT_ASSERT(!(sel1 == t));
sel1.serialize(b);
sel2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(sel1 == sel2);
CPPUNIT_ASSERT(!(sel1 != sel2));
CPPUNIT_ASSERT(sel1 == t);
CPPUNIT_ASSERT(!(sel1 != t));
cpac1 = dynamic_cast<const ArithmeticColumn*>(sel2.lhs());
CPPUNIT_ASSERT(cpac1 != NULL);
ct = cpac1->expression();
verifyParseTree(ct);
co2 = sel2.op();
CPPUNIT_ASSERT(co2 != NULL);
CPPUNIT_ASSERT(co2->data() == o1->data());
}
/* ExistFilters get tested as part of the CSEP test at the moment. */
void serializeExistsFilter()
{
ExistsFilter ef1, ef2;
ByteStream b;
ef2.data("ExistsFilter test");
ef1.serialize(b);
ef2.unserialize(b);
CPPUNIT_ASSERT(b.length() == 0);
CPPUNIT_ASSERT(ef2.data() == "");
}
void objectIDManager_1()
{
int oid, oidBase;
// fake out the objmgr...
setenv("CALPONT_CONFIG_FILE", "/usr/local/mariadb/columnstore/etc/Columnstore.xml", 1);
Config* cf = Config::makeConfig();
cf->setConfig("OIDManager", "OIDBitmapFile", "./oidbitmap");
try
{
ObjectIDManager o;
OIDBitmapFilename = strdup(o.getFilename().c_str());
oidBase = o.allocOID();
oid = o.allocOID();
CPPUNIT_ASSERT(oid == oidBase + 1);
oid = o.allocOIDs(20);
CPPUNIT_ASSERT(oid == oidBase + 2);
oid = o.allocOIDs(20);
CPPUNIT_ASSERT(oid == oidBase + 22);
o.returnOID(oidBase + 5);
oid = o.allocOID();
CPPUNIT_ASSERT(oid == oidBase + 5);
o.returnOID(oidBase + 5);
oid = o.allocOIDs(20);
CPPUNIT_ASSERT(oid == oidBase + 42);
oid = o.allocOID();
CPPUNIT_ASSERT(oid == oidBase + 5);
o.returnOIDs(oidBase, oidBase + 61);
oid = o.allocOID();
CPPUNIT_ASSERT(oid == oidBase);
o.returnOID(0);
}
catch (...)
{
if (OIDBitmapFilename != NULL)
unlink(OIDBitmapFilename); // XXXPAT: fix this when libstdc++ regains its sanity
throw;
}
unlink(OIDBitmapFilename);
}
/*
* destroySemaphores() and destroyShmseg() will print error messages
* if there are no objects to destroy. That's OK.
*/
void destroySemaphores()
{
key_t semkey;
int sems, err;
semkey = SESSIONMANAGER_SYSVKEY;
sems = semget(semkey, 2, 0666);
if (sems != -1)
{
err = semctl(sems, 0, IPC_RMID);
if (err == -1)
perror("tdriver: semctl");
}
}
void destroyShmseg()
{
key_t shmkey;
int shms, err;
shmkey = SESSIONMANAGER_SYSVKEY;
shms = shmget(shmkey, 0, 0666);
if (shms != -1)
{
err = shmctl(shms, IPC_RMID, NULL);
if (err == -1 && errno != EINVAL)
{
perror("tdriver: shmctl");
return;
}
}
}
void sessionManager_1()
{
SessionManager* sm = NULL;
SessionManager::TxnID txn;
const SessionManager::SIDTIDEntry* activeTxns;
int len;
string filename;
// destroySemaphores();
// destroyShmseg();
try
{
sm = new SessionManager();
// CPPUNIT_ASSERT(sm->verID() == 0);
filename = sm->getTxnIDFilename();
delete sm;
sm = new SessionManager();
txn = sm->newTxnID(0);
CPPUNIT_ASSERT(txn.valid == true);
// CPPUNIT_ASSERT(txn.id == 1);
// CPPUNIT_ASSERT(sm->verID() == 1);
txn = sm->newTxnID(1);
CPPUNIT_ASSERT(txn.valid == true);
// CPPUNIT_ASSERT(txn.id == 2);
// CPPUNIT_ASSERT(sm->verID() == 2);
activeTxns = sm->SIDTIDMap(len);
CPPUNIT_ASSERT(activeTxns != NULL);
CPPUNIT_ASSERT(len == 2);
txn = sm->getTxnID(0);
CPPUNIT_ASSERT(txn.valid == true);
// CPPUNIT_ASSERT(txn.id == 1);
CPPUNIT_ASSERT(txn.valid == activeTxns[0].txnid.valid);
// CPPUNIT_ASSERT(txn.id == activeTxns[0].txnid.id);
txn = sm->getTxnID(1);
CPPUNIT_ASSERT(txn.valid == true);
// CPPUNIT_ASSERT(txn.id == 2);
CPPUNIT_ASSERT(txn.valid == activeTxns[1].txnid.valid);
// CPPUNIT_ASSERT(txn.id == activeTxns[1].txnid.id);
delete[] activeTxns;
// make sure it's consistent across invocations
delete sm;
sm = new SessionManager();
activeTxns = sm->SIDTIDMap(len);
CPPUNIT_ASSERT(activeTxns != NULL);
CPPUNIT_ASSERT(len == 2);
txn = sm->getTxnID(0);
CPPUNIT_ASSERT(txn.valid == true);
// CPPUNIT_ASSERT(txn.id == 1);
CPPUNIT_ASSERT(txn.valid == activeTxns[0].txnid.valid);
// CPPUNIT_ASSERT(txn.id == activeTxns[0].txnid.id);
txn = sm->getTxnID(1);
CPPUNIT_ASSERT(txn.valid == true);
// CPPUNIT_ASSERT(txn.id == 2);
CPPUNIT_ASSERT(txn.valid == activeTxns[1].txnid.valid);
// CPPUNIT_ASSERT(txn.id == activeTxns[1].txnid.id);
sm->rolledback(txn);
CPPUNIT_ASSERT(txn.valid == false);
txn = sm->getTxnID(0);
sm->committed(txn);
CPPUNIT_ASSERT(txn.valid == false);
delete[] activeTxns;
activeTxns = sm->SIDTIDMap(len);
CPPUNIT_ASSERT(len == 0);
delete[] activeTxns;
}
catch (runtime_error& e)
{
cout << "caught runtime_error (why doesn't cppunit notice these?): " << e.what() << endl;
if (sm != NULL)
delete sm;
// destroySemaphores();
// destroyShmseg();
throw logic_error("Hey! Stop!");
}
catch (exception& e)
{
cout << "caught exception: " << e.what() << endl;
if (sm != NULL)
delete sm;
// destroySemaphores();
// destroyShmseg();
throw;
}
delete sm;
// destroySemaphores();
// destroyShmseg();
}
/** Verifies that we can only have MaxTxns (1000 right now) active transactions at
any given time */
void sessionManager_2()
{
int i;
SessionManager* sm;
SessionManager::TxnID txns[1001];
string filename;
// destroySemaphores();
// destroyShmseg();
sm = new SessionManager();
filename = sm->getTxnIDFilename();
delete sm;
sm = new SessionManager();
for (i = 0; i < 1000; i++)
{
txns[i] = sm->newTxnID(i, false);
CPPUNIT_ASSERT(txns[i].valid == true);
// CPPUNIT_ASSERT(sm->verID() == txns[i].id);
}
txns[1000] = sm->newTxnID(i, false);
CPPUNIT_ASSERT(txns[1000].valid == false);
for (i = 999; i >= 0; i--)
{
SessionManager::TxnID tmp = sm->getTxnID(i);
CPPUNIT_ASSERT(tmp.valid == txns[i].valid == true);
CPPUNIT_ASSERT(tmp.id == txns[i].id);
sm->committed(txns[i]);
tmp = sm->getTxnID(i);
CPPUNIT_ASSERT(tmp.valid == false);
CPPUNIT_ASSERT(txns[i].valid == false);
}
try
{
sm->committed(txns[1000]);
}
// expected exception
catch (invalid_argument& e)
{
}
delete sm;
// destroySemaphores();
// destroyShmseg();
}
/** Verifies that transaction IDs get saved and restored correctly across "reboots" */
void sessionManager_3()
{
SessionManager* sm;
string filename;
SessionManager::TxnID txnid;
// scrub env
sm = new SessionManager();
filename = sm->getTxnIDFilename();
delete sm;
// destroyShmseg();
// destroySemaphores();
sm = new SessionManager();
txnid = sm->newTxnID(0);
// CPPUNIT_ASSERT(txnid.id == 1);
delete sm;
// destroyShmseg();
// destroySemaphores();
sm = new SessionManager();
sm->committed(txnid);
txnid = sm->newTxnID(1);
// CPPUNIT_ASSERT(txnid.id == 2);
delete sm;
sm = new SessionManager();
sm->committed(txnid);
// destroyShmseg();
// destroySemaphores();
}
void sessionManager_4()
{
char* buf;
int len;
SessionManager sm;
buf = sm.getShmContents(len);
CPPUNIT_ASSERT(len > 0);
CPPUNIT_ASSERT(buf != NULL);
delete[] buf;
}
SessionManager* manager;
SessionManager::TxnID managerTxns[1000];
int createTxns(const int& start, const int& end)
{
const int first = start;
const int last = end;
int newTxns = 0;
int verifyLen = 0;
verifyLen = manager->verifySize();
for (int idx = first; idx < last && verifyLen < maxNewTxns; idx++)
{
managerTxns[idx] = manager->newTxnID((uint32_t)idx + 1000);
CPPUNIT_ASSERT(managerTxns[idx].id > 0);
CPPUNIT_ASSERT(managerTxns[idx].valid == true);
verifyLen = manager->verifySize();
newTxns++;
}
CPPUNIT_ASSERT(newTxns == last - first);
return newTxns;
}
int closeTxns(const int& start, const int& end)
{
int first = start;
int last = end;
int totalClosed = 0;
for (int idx = first; idx < last; idx++)
{
try
{
SessionManager::TxnID tmp = manager->getTxnID(idx + 1000);
if (tmp.valid == true)
{
manager->committed(tmp);
CPPUNIT_ASSERT(tmp.valid == false);
totalClosed++;
}
}
catch (exception& e)
{
cerr << e.what() << endl;
continue;
}
}
return totalClosed;
} // closeTxns
void MonitorTestPlan_1()
{
int currStartTxn = 0;
int currEndTxn = 5;
int txnCntIncr = 5;
const int sleepTime = 11;
const int iterMax = 1;
vector<SessionMonitor::MonSIDTIDEntry*> toTxns;
destroySemaphores();
destroyShmseg();
manager = new SessionManager();
manager->reset();
CPPUNIT_ASSERT(manager->verifySize() == 0);
SessionMonitor* monitor = NULL;
for (int jdx = 0; jdx < iterMax; jdx++)
{
// store the current state of the SessionManager
monitor = new SessionMonitor();
delete monitor;
int idx = 0;
int grpStart = currStartTxn;
for (idx = 0; idx < 3; idx++)
{
createTxns(currStartTxn, currEndTxn);
CPPUNIT_ASSERT(manager->verifySize() == (idx + 1) * txnCntIncr);
currStartTxn += txnCntIncr;
currEndTxn += txnCntIncr;
sleep(sleepTime); // make sessions time out
monitor = new SessionMonitor(); // read Monitor data
toTxns.clear();
toTxns = monitor->timedOutTxns(); // get timed out txns
CPPUNIT_ASSERT(toTxns.size() == (uint32_t)txnCntIncr * idx);
delete monitor;
}
int grpEnd = currEndTxn;
monitor = new SessionMonitor();
closeTxns(grpStart, grpEnd); // close this iteration of txns
CPPUNIT_ASSERT(manager->verifySize() == 0);
toTxns = monitor->timedOutTxns(); // get timed out txns
CPPUNIT_ASSERT(toTxns.size() == 0);
delete monitor;
}
monitor = new SessionMonitor(); // readload Monitor data
toTxns.clear();
toTxns = monitor->timedOutTxns(); // get timed out txns
CPPUNIT_ASSERT(toTxns.size() == 0);
delete monitor;
CPPUNIT_ASSERT(manager->verifySize() == 0);
if (manager)
delete manager;
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(ExecPlanTest);
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
int main(int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());
bool wasSuccessful = runner.run("", false);
return (wasSuccessful ? 0 : 1);
}