You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1070 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1070 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*****************************************************************************
 | |
|  * $Id: tdriver-jobstep.cpp 9210 2013-01-21 14:10:42Z rdempsey $
 | |
|  *
 | |
|  ****************************************************************************/
 | |
| 
 | |
| #include <iostream>
 | |
| 
 | |
| #include <cppunit/extensions/HelperMacros.h>
 | |
| #include <cppunit/extensions/TestFactoryRegistry.h>
 | |
| #include <cppunit/ui/text/TestRunner.h>
 | |
| 
 | |
| #include "joblist.h"
 | |
| #include "jobstep.h"
 | |
| #include "distributedenginecomm.h"
 | |
| #include "calpontsystemcatalog.h"
 | |
| #include "zdl.h"
 | |
| 
 | |
| using namespace std;
 | |
| using namespace joblist;
 | |
| using namespace execplan;
 | |
| 
 | |
| uint64_t count = 1000000;
 | |
| const uint64_t ZDL_VEC_SIZE = 4096;
 | |
| 
 | |
| class JobStepDriver : public CppUnit::TestFixture
 | |
| {
 | |
|   CPPUNIT_TEST_SUITE(JobStepDriver);
 | |
| 
 | |
|   /* These rely on Patrick's DB */
 | |
|   // CPPUNIT_TEST(pColScan_1);
 | |
|   // CPPUNIT_TEST(pColStep_1);
 | |
|   // CPPUNIT_TEST(pColStep_2);
 | |
|   // CPPUNIT_TEST(pColStep_as_ProjectionStep_1);
 | |
| 
 | |
|   // CPPUNIT_TEST(pnljoin_1);	// value list, no rid list, no reduction step
 | |
|   // CPPUNIT_TEST(pnljoin_2);	// value list, w/rid list, no reduction step
 | |
|   // CPPUNIT_TEST(pnljoin_3);	// value list + rid list + reduction step
 | |
| 
 | |
|   CPPUNIT_TEST(reduceStep_1);  // ElementType
 | |
|   CPPUNIT_TEST(reduceStep_2);  // StringElementType
 | |
|                                // CPPUNIT_TEST(reduceStep_3); // DoubleElementType
 | |
|   // CPPUNIT_TEST(reduceStep_4); // reduceStep_1 with BucketDLs as inputs
 | |
| 
 | |
|   CPPUNIT_TEST(unionStep_1);
 | |
|   // CPPUNIT_TEST(unionStep_2);
 | |
|   // CPPUNIT_TEST(unionStep_3);
 | |
| 
 | |
|   CPPUNIT_TEST_SUITE_END();
 | |
| 
 | |
|   ResourceManager fRm;
 | |
| 
 | |
|  private:
 | |
|  public:
 | |
|   void pColScan_1()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl1->bandedDL(dl1);
 | |
|     outJs.outAdd(spdl1);
 | |
| 
 | |
|     pColScanStep step0(inJs, outJs, dec, cat, 1003, 1000, 12345, 999, 7, 0, 0, fRm);
 | |
|     int8_t cop;
 | |
|     int64_t filterValue;
 | |
|     cop = COMPARE_GE;
 | |
|     filterValue = 3010;
 | |
|     step0.addFilter(cop, filterValue);
 | |
|     cop = COMPARE_LE;
 | |
|     filterValue = 3318;
 | |
|     step0.addFilter(cop, filterValue);
 | |
|     step0.setBOP(BOP_AND);
 | |
|     inJs = outJs;
 | |
| 
 | |
|     step0.run();
 | |
| 
 | |
|     step0.join();
 | |
| 
 | |
|     DeliveryStep step1(inJs, outJs, make_table("CALPONTSYS", "SYSTABLE"), cat, 10000, 0, 0, 0);
 | |
|     inJs = outJs;
 | |
| 
 | |
|     step1.run();
 | |
| 
 | |
|     step1.join();
 | |
|   }
 | |
| 
 | |
|   void pColStep_1()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|     ElementType e;
 | |
|     int i, it;
 | |
|     bool more;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl1->bandedDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     AnyDataListSPtr spdl2(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl2->bandedDL(dl2);
 | |
|     outJs.outAdd(spdl2);
 | |
| 
 | |
|     for (i = 10; i < 15; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       dl1->insert(e);
 | |
|     }
 | |
| 
 | |
|     dl1->endOfInput();
 | |
| 
 | |
|     pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12346, 11, 11, 1, 0, fRm);
 | |
| 
 | |
|     p.setRidList(dl1);  // JSA should do this
 | |
|     p.run();
 | |
|     p.join();
 | |
| 
 | |
|     it = dl2->getIterator();
 | |
| 
 | |
|     for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
 | |
| #ifdef DEBUG
 | |
|       cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
 | |
| 
 | |
| #else
 | |
|       ;  // walk the list silently
 | |
| #endif
 | |
|     CPPUNIT_ASSERT(i == 5);
 | |
|   }
 | |
| 
 | |
|   /* make sure it issues multiple primitive msgs correctly */
 | |
|   void pColStep_2()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|     ElementType e;
 | |
|     int i, it;
 | |
|     bool more;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl1->bandedDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     AnyDataListSPtr spdl2(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl2->bandedDL(dl2);
 | |
|     outJs.outAdd(spdl2);
 | |
| 
 | |
|     for (i = 10; i < 10000; i++)
 | |
|     {
 | |
|       if (i % 2 == 0)  // make it sparse
 | |
|       {
 | |
|         e.first = i;
 | |
|         dl1->insert(e);
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     dl1->endOfInput();
 | |
| 
 | |
|     pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12347, 11, 11, 1, 0, fRm);
 | |
| 
 | |
|     p.setRidList(dl1);  // JSA should do this
 | |
|     p.run();
 | |
|     p.join();
 | |
| 
 | |
|     it = dl2->getIterator();
 | |
| 
 | |
|     for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
 | |
| #ifdef DEBUG
 | |
|       cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
 | |
| 
 | |
| #else
 | |
|       ;  // walk the list silently
 | |
| #endif
 | |
|     CPPUNIT_ASSERT(i == 6);
 | |
|   }
 | |
| 
 | |
|   void pColStep_as_ProjectionStep_1()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|     ElementType e;
 | |
|     int i, it;
 | |
|     bool more;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl1 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl1->bandedDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     AnyDataListSPtr spdl2(new AnyDataList());
 | |
|     BandedDL<ElementType>* dl2 = new BandedDL<ElementType>(1, fRm);
 | |
|     spdl2->bandedDL(dl2);
 | |
|     outJs.outAdd(spdl2);
 | |
| 
 | |
|     for (i = 1; i <= 21; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       dl1->insert(e);
 | |
|     }
 | |
| 
 | |
|     dl1->endOfInput();
 | |
| 
 | |
|     // flushInterval = 8
 | |
|     pColStep p(inJs, outJs, dec, cat, 1003, 1000, 12348, 11, 11, 1, 2, fRm);
 | |
| 
 | |
|     p.setRidList(dl1);  // JSA should do this
 | |
|     p.run();
 | |
|     p.join();
 | |
| 
 | |
|     it = dl2->getIterator();
 | |
| 
 | |
|     for (more = dl2->next(it, &e), i = 0; more; more = dl2->next(it, &e), i++)
 | |
| #ifdef DEBUG
 | |
|       cout << "<rid = " << e.first << ", value = " << e.second << ">" << endl;
 | |
| 
 | |
| #else
 | |
|       ;  // walk the list silently
 | |
| #endif
 | |
|     // 		CPPUNIT_ASSERT(i == 5);
 | |
|   }
 | |
| 
 | |
|   void reduceStep_1()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     WSDL<ElementType>* outDL;
 | |
|     ElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     ZDL<ElementType>* inDL = new ZDL<ElementType>(1, fRm);
 | |
|     ZDL<ElementType>* dDL = new ZDL<ElementType>(1, fRm);
 | |
|     outDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->zonedDL(inDL);
 | |
|     driverADL->zonedDL(dDL);
 | |
|     outputADL->workingSetDL(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
| 
 | |
|     // 		cout << "making input DataList" << endl;
 | |
|     vector<ElementType> vec1;
 | |
|     vector<ElementType> vec2;
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       vec1.push_back(e);
 | |
| 
 | |
|       if (vec1.size() >= ZDL_VEC_SIZE)
 | |
|       {
 | |
|         inDL->insert(vec1);
 | |
|         vec1.clear();
 | |
|       }
 | |
| 
 | |
|       // 			inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     if (!vec1.empty())
 | |
|       inDL->insert(vec1);
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     // 		cout << "making driver DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i += 2)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       vec2.push_back(e);
 | |
| 
 | |
|       if (vec2.size() >= ZDL_VEC_SIZE)
 | |
|       {
 | |
|         dDL->insert(vec2);
 | |
|         vec2.clear();
 | |
|       }
 | |
| 
 | |
|       // 			dDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     if (!vec2.empty())
 | |
|       dDL->insert(vec2);
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     // 		cout << "reducing" << endl;
 | |
| 
 | |
|     ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
|     more = outDL->next(it, &e);
 | |
|     i = 0;
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
|       //  			cout << i << ": first: " << e.first << " second: " << e.second << endl;
 | |
|       CPPUNIT_ASSERT(e.first < ::count);
 | |
|       CPPUNIT_ASSERT(e.first % 2 == 0);
 | |
|       more = outDL->next(it, &e);
 | |
|       i++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void reduceStep_2()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     WSDL<StringElementType>* outDL;
 | |
|     StringElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     // 		inDL = new WSDL<StringElementType>(1, 10000, fRm);
 | |
|     // 		dDL = new WSDL<StringElementType>(1, 10000, fRm);
 | |
|     ZDL<StringElementType>* inDL = new ZDL<StringElementType>(1, fRm);
 | |
|     ZDL<StringElementType>* dDL = new ZDL<StringElementType>(1, fRm);
 | |
| 
 | |
|     outDL = new WSDL<StringElementType>(1, 10000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->stringZonedDL(inDL);
 | |
|     driverADL->stringZonedDL(dDL);
 | |
|     outputADL->strDataList(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
|     vector<StringElementType> vec1;
 | |
|     vector<StringElementType> vec2;
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = string("blahblahblahblahblah");
 | |
|       vec1.push_back(e);
 | |
| 
 | |
|       if (vec1.size() >= ZDL_VEC_SIZE)
 | |
|       {
 | |
|         inDL->insert(vec1);
 | |
|         vec1.clear();
 | |
|       }
 | |
| 
 | |
|       if (0 == i % 2)
 | |
|       {
 | |
|         e.second = string("blahblahblah");
 | |
|         vec2.push_back(e);
 | |
| 
 | |
|         if (vec2.size() >= ZDL_VEC_SIZE)
 | |
|         {
 | |
|           dDL->insert(vec2);
 | |
|           vec2.clear();
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       // 			inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     if (!vec1.empty())
 | |
|       inDL->insert(vec1);
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     /*		for (i = 0; i < ::count; i+=2) {
 | |
|                             e.first = i;
 | |
|                             e.second = string("blahblahblah");
 | |
|                             dDL->insert(e);
 | |
|                     }*/
 | |
|     if (!vec2.empty())
 | |
|       dDL->insert(vec2);
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
|     more = outDL->next(it, &e);
 | |
|     i = 0;
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
|       //    			cout << i << ": first: " << e.first << " second: " << e.second << endl;
 | |
|       CPPUNIT_ASSERT(e.first < ::count);
 | |
|       CPPUNIT_ASSERT(e.first % 2 == 0);
 | |
|       more = outDL->next(it, &e);
 | |
|       i++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void reduceStep_3()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     WSDL<DoubleElementType>*inDL, *dDL, *outDL;
 | |
|     DoubleElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     inDL = new WSDL<DoubleElementType>(1, 100000, fRm);
 | |
|     dDL = new WSDL<DoubleElementType>(1, 100000, fRm);
 | |
|     outDL = new WSDL<DoubleElementType>(1, 100000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->doubleDL(inDL);
 | |
|     driverADL->doubleDL(dDL);
 | |
|     outputADL->doubleDL(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = ((double)i) + 0.1;
 | |
|       inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     for (i = 0; i < ::count; i += 2)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = ((double)i) + 0.1;
 | |
|       dDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
|     more = outDL->next(it, &e);
 | |
|     i = 0;
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
|       //    			cout << i << ": first: " << e.first << " second: " << e.second << endl;
 | |
|       CPPUNIT_ASSERT(e.first < ::count);
 | |
|       CPPUNIT_ASSERT(e.first % 2 == 0);
 | |
|       more = outDL->next(it, &e);
 | |
|       i++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void reduceStep_4()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     BucketDL<ElementType>*inDL, *dDL;
 | |
|     WSDL<ElementType>* outDL;
 | |
|     ElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     inDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
 | |
|     dDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
 | |
|     outDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->bucketDL(inDL);
 | |
|     driverADL->bucketDL(dDL);
 | |
|     outputADL->workingSetDL(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
| 
 | |
|     // 		cout << "making input DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     // 		cout << "making driver DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i += 2)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       dDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     // 		cout << "reducing" << endl;
 | |
| 
 | |
|     ReduceStep rs(in, out, 5, 1, 0, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
|     more = outDL->next(it, &e);
 | |
|     i = 0;
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
|       //  			cout << i << ": first: " << e.first << " second: " << e.second << endl;
 | |
|       CPPUNIT_ASSERT(e.first < ::count);
 | |
|       CPPUNIT_ASSERT(e.first % 2 == 0);
 | |
|       more = outDL->next(it, &e);
 | |
|       i++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void pnljoin_1()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|     ElementType e;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     WSDL<ElementType>* dl1 = new WSDL<ElementType>(1, 100, fRm);
 | |
|     spdl1->workingSetDL(dl1);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     AnyDataListSPtr spdl2(new AnyDataList());
 | |
|     WSDL<ElementType>* dl2 = new WSDL<ElementType>(1, 100, fRm);
 | |
|     spdl2->workingSetDL(dl2);
 | |
|     outJs.outAdd(spdl2);
 | |
| 
 | |
|     /* These values are unique to Pat's DB files unfortunately. */
 | |
|     /* Fill in the value list */
 | |
|     dl1->insert(ElementType(1, 3179));  // row 10 in the target
 | |
|     dl1->insert(ElementType(2, 3191));  // row 12
 | |
|     dl1->insert(ElementType(3, 3207));  // row 14
 | |
|     dl1->insert(ElementType(4, 3318));  // row 20  OOO
 | |
|     dl1->insert(ElementType(5, 3242));  // row 16
 | |
|     dl1->insert(ElementType(6, 3289));  // row 18
 | |
|     dl1->insert(ElementType(7, 3191));  // duplicate value of row 14
 | |
| 
 | |
|     dl1->endOfInput();
 | |
|     dl1->OID(1003);
 | |
| 
 | |
|     PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
 | |
| 
 | |
|     joiner.run();
 | |
|     joiner.join();
 | |
| 
 | |
|     it = dl2->getIterator();
 | |
|     more = dl2->next(it, &e);
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
| #ifdef DEBUG
 | |
|       cout << "first: " << e.first << " second: " << e.second << endl;
 | |
| #endif
 | |
|       more = dl2->next(it, &e);
 | |
|     }
 | |
|   }
 | |
|   void pnljoin_2()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|     ElementType e;
 | |
|     int i, it;
 | |
|     bool more;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     WSDL<ElementType>* valueList = new WSDL<ElementType>(1, 100, fRm);
 | |
|     spdl1->workingSetDL(valueList);
 | |
|     inJs.outAdd(spdl1);
 | |
| 
 | |
|     AnyDataListSPtr spdl2(new AnyDataList());
 | |
|     AnyDataListSPtr spdl3(new AnyDataList());
 | |
|     WSDL<ElementType>* colResults = new WSDL<ElementType>(1, 100, fRm);
 | |
|     WSDL<ElementType>* inputRidList = new WSDL<ElementType>(1, 100, fRm);
 | |
|     spdl2->workingSetDL(colResults);
 | |
|     spdl3->workingSetDL(inputRidList);
 | |
|     outJs.outAdd(spdl2);
 | |
|     inJs.outAdd(spdl3);
 | |
| 
 | |
|     /* These values are unique to Pat's DB files unfortunately. */
 | |
|     /* Fill in the value list */
 | |
|     valueList->insert(ElementType(1, 3179));  // row 10 in the target
 | |
|     valueList->insert(ElementType(2, 3191));  // row 12
 | |
|     valueList->insert(ElementType(3, 3207));  // row 14
 | |
|     valueList->insert(ElementType(4, 3318));  // row 20  OOO
 | |
|     valueList->insert(ElementType(5, 3242));  // row 16
 | |
|     valueList->insert(ElementType(6, 3289));  // row 18
 | |
|     valueList->insert(ElementType(7, 3191));  // duplicate value of row 14
 | |
| 
 | |
|     valueList->endOfInput();
 | |
|     valueList->OID(1003);
 | |
| 
 | |
|     // supply a ridlist with row 16 missing; make sure it's missing in the result
 | |
|     for (i = 0; i < 25; i++)
 | |
|       if (i != 16)
 | |
|         inputRidList->insert(ElementType(i, i));
 | |
| 
 | |
|     inputRidList->endOfInput();
 | |
| 
 | |
|     PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
 | |
| 
 | |
|     joiner.run();
 | |
|     joiner.join();
 | |
| 
 | |
|     it = colResults->getIterator();
 | |
|     more = colResults->next(it, &e);
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
| #ifdef DEBUG
 | |
|       cout << "first: " << e.first << " second: " << e.second << endl;
 | |
| #endif
 | |
|       more = colResults->next(it, &e);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void pnljoin_3()
 | |
|   {
 | |
|     DistributedEngineComm* dec;
 | |
|     boost::shared_ptr<CalpontSystemCatalog> cat;
 | |
|     ElementType e;
 | |
|     int i, it;
 | |
|     bool more;
 | |
| 
 | |
|     dec = DistributedEngineComm::instance(fRm);
 | |
|     // 	dec = DistributedEngineComm::instance("./config-dec.xml");
 | |
|     cat = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | |
| 
 | |
|     AnyDataListSPtr spdl1(new AnyDataList());
 | |
|     AnyDataListSPtr spdl3(new AnyDataList());
 | |
|     WSDL<ElementType>* valueList = new WSDL<ElementType>(1, 100, fRm);
 | |
|     WSDL<ElementType>* inputRidList = new WSDL<ElementType>(1, 100, fRm);
 | |
|     spdl1->workingSetDL(valueList);
 | |
|     spdl3->workingSetDL(inputRidList);
 | |
| 
 | |
|     JobStepAssociation inJs;
 | |
|     JobStepAssociation outJs;
 | |
| 
 | |
|     inJs.outAdd(spdl1);
 | |
|     inJs.outAdd(spdl3);
 | |
| 
 | |
|     AnyDataListSPtr spdl2(new AnyDataList());
 | |
|     AnyDataListSPtr spdl4(new AnyDataList());
 | |
|     WSDL<ElementType>* colResults = new WSDL<ElementType>(1, 100, fRm);
 | |
|     WSDL<ElementType>* reducedRidList = new WSDL<ElementType>(1, 100, fRm);
 | |
|     spdl2->workingSetDL(colResults);
 | |
|     spdl4->workingSetDL(reducedRidList);
 | |
|     outJs.outAdd(spdl2);
 | |
|     outJs.outAdd(spdl4);
 | |
| 
 | |
|     /* These values are unique to Pat's DB files unfortunately. */
 | |
|     /* Fill in the value list */
 | |
|     valueList->insert(ElementType(10, 3179));  // row 10 in the target
 | |
|     valueList->insert(ElementType(12, 3191));  // row 12
 | |
|     valueList->insert(
 | |
|         ElementType(14541513, 3207));  // row 14 - on output should be in colResults, not in reducedridlist
 | |
|     valueList->insert(ElementType(20, 3318));  // row 20  OOO
 | |
|     valueList->insert(ElementType(16, 3242));  // row 16
 | |
|     valueList->insert(ElementType(18, 3289));  // row 18
 | |
| 
 | |
|     // XXXPAT: Duplicates here can end up in the reducedRidList.  Technically it's
 | |
|     // correct, but do we want that or not?
 | |
|     // 		valueList->insert(ElementType(12, 3191));  // duplicate value of row 12
 | |
| 
 | |
|     valueList->endOfInput();
 | |
|     valueList->OID(1003);
 | |
| 
 | |
|     // supply a ridlist with row 16 missing; make sure it's missing in the result
 | |
|     for (i = 0; i < 25; i++)
 | |
|       if (i != 16)
 | |
|         inputRidList->insert(ElementType(i, i));
 | |
| 
 | |
|     inputRidList->endOfInput();
 | |
| 
 | |
|     PNLJoin joiner(inJs, outJs, dec, cat, 12349, 1000, 1, 0, fRm);
 | |
| 
 | |
|     joiner.run();
 | |
|     joiner.join();
 | |
| 
 | |
|     it = colResults->getIterator();
 | |
|     more = colResults->next(it, &e);
 | |
| #ifdef DEBUG
 | |
|     cout << "ColResults:" << endl;
 | |
| #endif
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
| #ifdef DEBUG
 | |
|       cout << "   first: " << e.first << " second: " << e.second << endl;
 | |
| #endif
 | |
|       more = colResults->next(it, &e);
 | |
|     }
 | |
| 
 | |
|     it = reducedRidList->getIterator();
 | |
|     more = reducedRidList->next(it, &e);
 | |
| #ifdef DEBUG
 | |
|     cout << "Reduced Rid List:" << endl;
 | |
| #endif
 | |
| 
 | |
|     while (more)
 | |
|     {
 | |
| #ifdef DEBUG
 | |
|       cout << "   first: " << e.first << " second: " << e.second << endl;
 | |
| #endif
 | |
|       more = reducedRidList->next(it, &e);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void unionStep_1()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     WSDL<ElementType>* outDL;
 | |
|     set<ElementType> s;
 | |
|     set<ElementType>::iterator sIt;
 | |
|     ElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     ZDL<ElementType>* inDL = new ZDL<ElementType>(1, fRm);
 | |
|     ZDL<ElementType>* dDL = new ZDL<ElementType>(1, fRm);
 | |
| 
 | |
|     outDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->zonedDL(inDL);
 | |
|     driverADL->zonedDL(dDL);
 | |
| 
 | |
|     outputADL->workingSetDL(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
| 
 | |
|     //  		cout << "making input DataList" << endl;
 | |
|     vector<ElementType> vec1;
 | |
|     vector<ElementType> vec2;
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       vec1.push_back(e);
 | |
| 
 | |
|       if (vec1.size() >= ZDL_VEC_SIZE)
 | |
|       {
 | |
|         inDL->insert(vec1);
 | |
|         vec1.clear();
 | |
|       }
 | |
| 
 | |
|       // 			inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     if (!vec1.empty())
 | |
|       inDL->insert(vec1);
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     // 		cout << "making driver DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i += 2)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       vec2.push_back(e);
 | |
| 
 | |
|       if (vec2.size() >= ZDL_VEC_SIZE)
 | |
|       {
 | |
|         dDL->insert(vec2);
 | |
|         vec2.clear();
 | |
|       }
 | |
| 
 | |
|       // 			dDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     if (!vec2.empty())
 | |
|       dDL->insert(vec2);
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     //  		cout << "unionizing" << endl;
 | |
| 
 | |
|     UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     CPPUNIT_ASSERT(outDL->totalSize() == inDL->totalSize());
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
| 
 | |
|     for (i = 0, more = outDL->next(it, &e); more; more = outDL->next(it, &e), i++)
 | |
|       s.insert(e);
 | |
| 
 | |
|     CPPUNIT_ASSERT(outDL->totalSize() == i);
 | |
|     CPPUNIT_ASSERT(i == ::count);
 | |
| 
 | |
|     CPPUNIT_ASSERT(s.size() == outDL->totalSize());  // verifies no duplicates in outDL
 | |
| 
 | |
|     for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
 | |
|       CPPUNIT_ASSERT(sIt->first == i);
 | |
| 
 | |
|     CPPUNIT_ASSERT(i == ::count);  // verifies they all exist.
 | |
|   }
 | |
| 
 | |
|   void unionStep_2()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     WSDL<ElementType>*inDL, *dDL, *outDL;
 | |
|     set<ElementType> s;
 | |
|     set<ElementType>::iterator sIt;
 | |
|     ElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     inDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     dDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     outDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->workingSetDL(inDL);
 | |
|     driverADL->workingSetDL(dDL);
 | |
|     outputADL->workingSetDL(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
| 
 | |
|     //  		cout << "making input DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i += 2)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     //  		cout << "making driver DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       dDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     //  		cout << "unionizing" << endl;
 | |
| 
 | |
|     UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     CPPUNIT_ASSERT(outDL->totalSize() == dDL->totalSize());
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
| 
 | |
|     for (i = 0, more = outDL->next(it, &e); more; more = outDL->next(it, &e), i++)
 | |
|       s.insert(e);
 | |
| 
 | |
|     CPPUNIT_ASSERT(outDL->totalSize() == i);
 | |
|     CPPUNIT_ASSERT(i == ::count);
 | |
| 
 | |
|     CPPUNIT_ASSERT(s.size() == outDL->totalSize());  // verifies no duplicates in outDL
 | |
| 
 | |
|     for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
 | |
|       CPPUNIT_ASSERT(sIt->first == i);
 | |
| 
 | |
|     CPPUNIT_ASSERT(i == ::count);  // verifies they all exist.
 | |
|   }
 | |
| 
 | |
|   void unionStep_3()
 | |
|   {
 | |
|     JobStepAssociation in, out;
 | |
|     AnyDataList *inputADL, *driverADL, *outputADL;
 | |
|     AnyDataListSPtr inputSPtr, driverSPtr, outputSPtr;
 | |
|     WSDL<ElementType>* outDL;
 | |
|     BucketDL<ElementType>*inDL, *dDL;
 | |
|     set<ElementType> s;
 | |
|     set<ElementType>::iterator sIt;
 | |
|     ElementType e;
 | |
|     unsigned i;
 | |
|     int it;
 | |
|     bool more;
 | |
| 
 | |
|     inDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
 | |
|     dDL = new BucketDL<ElementType>(10, 1, 100000, fRm);
 | |
|     outDL = new WSDL<ElementType>(1, 100000, fRm);
 | |
|     inputADL = new AnyDataList();
 | |
|     driverADL = new AnyDataList();
 | |
|     outputADL = new AnyDataList();
 | |
|     inputADL->bucketDL(inDL);
 | |
|     driverADL->bucketDL(dDL);
 | |
|     outputADL->workingSetDL(outDL);
 | |
|     inputSPtr.reset(inputADL);
 | |
|     driverSPtr.reset(driverADL);
 | |
|     outputSPtr.reset(outputADL);
 | |
| 
 | |
|     in.outAdd(inputSPtr);
 | |
|     in.outAdd(driverSPtr);
 | |
|     out.outAdd(outputSPtr);
 | |
| 
 | |
|     //  		cout << "making input DataList" << endl;
 | |
| 
 | |
|     for (i = 0; i < ::count; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       inDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     inDL->endOfInput();
 | |
| 
 | |
|     //  		cout << "making driver DataList" << endl;
 | |
| 
 | |
|     for (; i < ::count * 2; i++)
 | |
|     {
 | |
|       e.first = i;
 | |
|       e.second = i + 1;
 | |
|       dDL->insert(e);
 | |
|     }
 | |
| 
 | |
|     dDL->endOfInput();
 | |
| 
 | |
|     //  		cout << "unionizing" << endl;
 | |
| 
 | |
|     UnionStep rs(in, out, 50, 5, 1, 0, 0, 0);
 | |
|     rs.run();
 | |
|     rs.join();
 | |
| 
 | |
|     CPPUNIT_ASSERT(outDL->totalSize() == 2 * ::count);
 | |
| 
 | |
|     it = outDL->getIterator();
 | |
| 
 | |
|     for (i = 0, more = outDL->next(it, &e); more; more = outDL->next(it, &e), i++)
 | |
|       s.insert(e);
 | |
| 
 | |
|     CPPUNIT_ASSERT(outDL->totalSize() == i);
 | |
|     CPPUNIT_ASSERT(i == 2 * ::count);
 | |
| 
 | |
|     CPPUNIT_ASSERT(s.size() == outDL->totalSize());  // verifies no duplicates in outDL
 | |
| 
 | |
|     for (i = 0, sIt = s.begin(); sIt != s.end(); sIt++, i++)
 | |
|       CPPUNIT_ASSERT(sIt->first == i);
 | |
| 
 | |
|     CPPUNIT_ASSERT(i == 2 * ::count);  // verifies they all exist.
 | |
|   }
 | |
| };
 | |
| 
 | |
| CPPUNIT_TEST_SUITE_REGISTRATION(JobStepDriver);
 | |
| 
 | |
| int main(int argc, char** argv)
 | |
| {
 | |
|   CppUnit::TextUi::TestRunner runner;
 | |
|   CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
 | |
|   runner.addTest(registry.makeTest());
 | |
|   bool wasSuccessful = runner.run("", false);
 | |
|   return (wasSuccessful ? 0 : 1);
 | |
| }
 |