1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge branch 'develop' into MCOL-3585

This commit is contained in:
Jose Rojas
2019-12-05 10:03:40 -06:00
committed by GitHub
212 changed files with 2749 additions and 1805 deletions

View File

@@ -35,7 +35,5 @@ ADD_LIBRARY(ddlpackage SHARED
${CMAKE_CURRENT_SOURCE_DIR}/ddl-scan.cpp
)
SET_TARGET_PROPERTIES(ddlpackage PROPERTIES VERSION 1.0.0 SOVERSION 1)
INSTALL(TARGETS ddlpackage DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
INSTALL(TARGETS ddlpackage DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-libs)

View File

@@ -16,7 +16,5 @@ add_library(ddlpackageproc SHARED ${ddlpackageproc_LIB_SRCS})
target_link_libraries(ddlpackageproc ${NETSNMP_LIBRARIES})
set_target_properties(ddlpackageproc PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS ddlpackageproc DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
install(TARGETS ddlpackageproc DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-libs)

View File

@@ -37,7 +37,5 @@ ADD_LIBRARY(dmlpackage SHARED
)
SET_TARGET_PROPERTIES(dmlpackage PROPERTIES VERSION 1.0.0 SOVERSION 1)
INSTALL(TARGETS dmlpackage DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
INSTALL(TARGETS dmlpackage DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-libs)

View File

@@ -17,8 +17,6 @@ add_library(dmlpackageproc SHARED ${dmlpackageproc_LIB_SRCS})
target_link_libraries(dmlpackageproc ${NETSNMP_LIBRARIES})
set_target_properties(dmlpackageproc PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS dmlpackageproc DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
install(TARGETS dmlpackageproc DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-libs)

View File

@@ -48,7 +48,5 @@ add_library(execplan SHARED ${execplan_LIB_SRCS})
target_link_libraries(execplan ${NETSNMP_LIBRARIES})
set_target_properties(execplan PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS execplan DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
install(TARGETS execplan DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-libs)

View File

@@ -430,9 +430,9 @@ public:
struct TableAliasName
{
TableAliasName (): fisColumnStore (true) {}
TableAliasName (std::string sch, std::string tb, std::string al) :
TableAliasName (const std::string &sch, const std::string &tb, const std::string &al) :
schema (sch), table (tb), alias (al), fisColumnStore(true) {}
TableAliasName (std::string sch, std::string tb, std::string al, std::string v) :
TableAliasName (const std::string &sch, const std::string &tb, const std::string &al, const std::string &v) :
schema (sch), table (tb), alias (al), view(v), fisColumnStore(true) {}
std::string schema;
std::string table;

View File

@@ -59,13 +59,11 @@ set(joblist_LIB_SRCS
add_library(joblist SHARED ${joblist_LIB_SRCS})
set_target_properties(joblist PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS joblist DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
install(TARGETS joblist DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-libs)
if (WITH_ORDERBY_UT)
add_executable(job_orderby_tests orderby-tests.cpp)
target_link_libraries(job_orderby_tests ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
install(TARGETS job_orderby_tests DESTINATION ${ENGINE_BINDIR} COMPONENT platform)
install(TARGETS job_orderby_tests DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-platform)
endif()

View File

@@ -509,7 +509,7 @@ void ExpressionStep::updateInputIndex(map<uint32_t, uint32_t>& indexMap, const J
CalpontSystemCatalog::OID oid = sc->oid();
CalpontSystemCatalog::OID dictOid = 0;
CalpontSystemCatalog::ColType ct;
uint32_t key = fColumnKeys[distance(fColumns.begin(), it)];
uint32_t key = fColumnKeys[std::distance(fColumns.begin(), it)];
if (sc->schemaName().empty())
{

View File

@@ -263,7 +263,7 @@ void GroupConcatInfo::mapColumns(const RowGroup& projRG)
}
else
{
idx = distance(keys.begin(), i3);
idx = std::distance(keys.begin(), i3);
}
(*k)->fOrderCond.push_back(make_pair(idx, i2->second));

View File

@@ -1007,12 +1007,12 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
unsigned itInc = 1; // iterator increase number
unsigned numOfStepsAddToBps = 0; // # steps to be added into TBPS
if ((distance(it, end) > 2 &&
if ((std::distance(it, end) > 2 &&
dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
(dynamic_cast<pColScanStep*>((it + 1)->get()) != NULL ||
dynamic_cast<pColStep*>((it + 1)->get()) != NULL) &&
dynamic_cast<TupleHashJoinStep*>((it + 2)->get()) != NULL) ||
(distance(it, end) > 1 &&
(std::distance(it, end) > 1 &&
dynamic_cast<pDictionaryScan*>(it->get()) != NULL &&
dynamic_cast<TupleHashJoinStep*>((it + 1)->get()) != NULL))
{
@@ -1053,7 +1053,7 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
itInc = 1;
numOfStepsAddToBps = 0;
}
else if (distance(begin, it) > 1 &&
else if (std::distance(begin, it) > 1 &&
(dynamic_cast<pDictionaryScan*>((it - 1)->get()) != NULL ||
dynamic_cast<pDictionaryScan*>((it - 2)->get()) != NULL) &&
dynamic_cast<TupleHashJoinStep*>(it->get()) != NULL)
@@ -1062,14 +1062,14 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
itInc = 1;
numOfStepsAddToBps = 0;
}
else if (distance(it, end) > 2 &&
else if (std::distance(it, end) > 2 &&
dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
dynamic_cast<FilterStep*>((it + 2)->get()) != NULL)
{
itInc = 3;
numOfStepsAddToBps = 3;
}
else if (distance(it, end) > 3 &&
else if (std::distance(it, end) > 3 &&
dynamic_cast<pColStep*>((it + 1)->get()) != NULL &&
dynamic_cast<pDictionaryStep*>((it + 2)->get()) != NULL &&
dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
@@ -1077,7 +1077,7 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
itInc = 4;
numOfStepsAddToBps = 4;
}
else if (distance(it, end) > 3 &&
else if (std::distance(it, end) > 3 &&
dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
dynamic_cast<FilterStep*>((it + 3)->get()) != NULL)
@@ -1085,7 +1085,7 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
itInc = 4;
numOfStepsAddToBps = 4;
}
else if (distance(it, end) > 4 &&
else if (std::distance(it, end) > 4 &&
dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL &&
dynamic_cast<pColStep*>((it + 2)->get()) != NULL &&
dynamic_cast<pDictionaryStep*>((it + 3)->get()) != NULL &&
@@ -1094,7 +1094,7 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
itInc = 5;
numOfStepsAddToBps = 5;
}
else if (distance(it, end) > 1 &&
else if (std::distance(it, end) > 1 &&
(dynamic_cast<pColStep*>(it->get()) != NULL ||
dynamic_cast<pColScanStep*>(it->get()) != NULL) &&
dynamic_cast<pDictionaryStep*>((it + 1)->get()) != NULL)
@@ -2060,7 +2060,7 @@ uint32_t getKeyIndex(uint32_t key, const RowGroup& rg)
if (i == rg.getKeys().end())
throw runtime_error("No key found.");
return distance(rg.getKeys().begin(), i);
return std::distance(rg.getKeys().begin(), i);
}

View File

@@ -1110,11 +1110,11 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
else
it = pcv.insert(pcv.end(), srcp);
projectKeys.insert(projectKeys.begin() + distance(pcv.begin(), it), tupleKey);
projectKeys.insert(projectKeys.begin() + std::distance(pcv.begin(), it), tupleKey);
}
else if (doDistinct) // @bug4250, move forward distinct column if necessary.
{
uint32_t pos = distance(projectKeys.begin(), keyIt);
uint32_t pos = std::distance(projectKeys.begin(), keyIt);
if (pos >= lastGroupByPos)
{
@@ -1263,11 +1263,11 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
else
it = pcv.insert(pcv.end(), srcp);
projectKeys.insert(projectKeys.begin() + distance(pcv.begin(), it), tupleKey);
projectKeys.insert(projectKeys.begin() + std::distance(pcv.begin(), it), tupleKey);
}
else if (doDistinct) // @bug4250, move forward distinct column if necessary.
{
uint32_t pos = distance(projectKeys.begin(), keyIt);
uint32_t pos = std::distance(projectKeys.begin(), keyIt);
if (pos >= lastGroupByPos)
{
@@ -1392,7 +1392,7 @@ void changePcolStepToPcolScan(JobStepVector::iterator& it, JobStepVector::iterat
{
//If we have a pDictionaryScan-pColStep duo, then change the pColStep
if (typeid(*(it->get())) == typeid(pDictionaryScan) &&
distance(it, end) > 1 &&
std::distance(it, end) > 1 &&
typeid(*((it + 1)->get())) == typeid(pColStep))
{
++it;

View File

@@ -310,11 +310,8 @@ void pDictionaryScan::addFilter(int8_t COP, const string& value)
}
else
{
uint8_t* s = (uint8_t*)alloca(value.size() * sizeof(uint8_t));
memcpy(s, value.data(), value.size());
fFilterString << (uint16_t) value.size();
fFilterString.append(s, value.size());
fFilterString.append((const uint8_t*)value.data(), value.size());
}
}

View File

@@ -80,6 +80,7 @@ ResourceManager::ResourceManager(bool runningInExeMgr) :
fJlProcessorThreadsPerScan(defaultProcessorThreadsPerScan),
fJlNumScanReceiveThreads(defaultScanReceiveThreads),
fTwNumThreads(defaultNumThreads),
fJlMaxOutstandingRequests(defaultMaxOutstandingRequests),
fHJUmMaxMemorySmallSideDistributor(fHashJoinStr,
"UmMaxMemorySmallSide",
getUintVal(fHashJoinStr, "TotalUmMaxMemorySmallSide", defaultTotalUmMemory),
@@ -87,8 +88,7 @@ ResourceManager::ResourceManager(bool runningInExeMgr) :
0),
fHJPmMaxMemorySmallSideSessionMap(
getUintVal(fHashJoinStr, "PmMaxMemorySmallSide", defaultHJPmMaxMemorySmallSide)),
isExeMgr(runningInExeMgr),
fJlMaxOutstandingRequests(defaultMaxOutstandingRequests)
isExeMgr(runningInExeMgr)
{
int temp;
int configNumCores = -1;
@@ -231,7 +231,7 @@ ResourceManager::ResourceManager(bool runningInExeMgr) :
fAggNumBuckets = fAggNumThreads * 4;
else
fAggNumBuckets = fConfig->uFromText(nb);
nr = fConfig->getConfig("RowAggregation", "RowAggrRowGroupsPerThread");
if (nr.empty())
@@ -415,4 +415,3 @@ bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr<int64_t> sessi
} //namespace

View File

@@ -54,6 +54,7 @@ using namespace funcexp;
using namespace querytele;
#include "atomicops.h"
#include "spinlock.h"
namespace joblist
{
@@ -73,8 +74,6 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
fTupleId2(-1),
fCorrelatedSide(0),
resourceManager(jobInfo.rm),
totalUMMemoryUsage(0),
rgDataSize(0),
runRan(false),
joinRan(false),
largeSideIndex(1),
@@ -84,7 +83,8 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
fTokenJoin(-1),
fStatsMutexPtr(new boost::mutex()),
fFunctionJoinKeys(jobInfo.keyInfo->functionJoinKeys),
sessionMemLimit(jobInfo.umMemLimit)
sessionMemLimit(jobInfo.umMemLimit),
rgdLock(false)
{
/* Need to figure out how much memory these use...
Overhead storing 16 byte elements is about 32 bytes. That
@@ -116,11 +116,13 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
else
allowDJS = false;
numCores = resourceManager->numCores();
if (numCores <= 0)
numCores = 8;
/* Debugging, rand() is used to simulate failures
time_t t = time(NULL);
srand(t);
*/
}
TupleHashJoinStep::~TupleHashJoinStep()
@@ -130,8 +132,8 @@ TupleHashJoinStep::~TupleHashJoinStep()
if (ownsOutputDL)
delete outputDL;
if (totalUMMemoryUsage != 0)
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
for (uint i = 0 ; i < smallDLs.size(); i++)
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
//cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl;
}
@@ -200,140 +202,240 @@ void TupleHashJoinStep::join()
}
}
/* Index is which small input to read. */
void TupleHashJoinStep::smallRunnerFcn(uint32_t index)
// simple sol'n. Poll mem usage of Joiner once per second. Request mem
// increase after the fact. Failure to get mem will be detected and handled by
// the threads inserting into Joiner.
void TupleHashJoinStep::trackMem(uint index)
{
uint64_t i;
bool more, flippedUMSwitch = false, gotMem;
RGData oneRG;
//shared_array<uint8_t> oneRG;
Row r;
boost::shared_ptr<TupleJoiner> joiner = joiners[index];
ssize_t memBefore = 0, memAfter = 0;
bool gotMem;
boost::unique_lock<boost::mutex> scoped(memTrackMutex);
while (!stopMemTracking)
{
memAfter = joiner->getMemUsage();
if (memAfter != memBefore)
{
gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
memBefore = memAfter;
if (!gotMem)
return;
}
memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1));
}
// one more iteration to capture mem usage since last poll, for this one
// raise an error if mem went over the limit
memAfter = joiner->getMemUsage();
if (memAfter == memBefore)
return;
gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
if (!gotMem)
{
if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
(tableOid() < 3000 && tableOid() >= 1000)))
{
joinIsTooBig = true;
fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl;
abort();
}
}
}
void TupleHashJoinStep::startSmallRunners(uint index)
{
utils::setThreadName("HJSStartSmall");
string extendedInfo;
JoinType jt;
RowGroupDL* smallDL;
uint32_t smallIt;
RowGroup smallRG;
boost::shared_ptr<TupleJoiner> joiner;
string extendedInfo;
extendedInfo += toString(); // is each small side supposed to have the whole THJS info?
smallDL = smallDLs[index];
smallIt = smallIts[index];
smallRG = smallRGs[index];
jt = joinTypes[index];
extendedInfo += toString();
//cout << " smallRunner " << index << " sees jointype " << jt << " joinTypes has " << joinTypes.size()
// << " elements" << endl;
if (typelessJoin[index])
{
joiner.reset(new TupleJoiner(smallRG, largeRG, smallSideKeys[index],
largeSideKeys[index], jt));
joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index],
largeSideKeys[index], jt, &jobstepThreadPool));
}
else
{
joiner.reset(new TupleJoiner(smallRG, largeRG, smallSideKeys[index][0],
largeSideKeys[index][0], jt));
joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0],
largeSideKeys[index][0], jt, &jobstepThreadPool));
}
joiner->setUniqueLimit(uniqueLimit);
joiner->setTableName(smallTableNames[index]);
joiners[index] = joiner;
/* check for join types unsupported on the PM. */
if (!largeBPS || !isExeMgr)
joiner->setInUM(rgData[index]);
/*
read the small side into a TupleJoiner
send the TupleJoiner to the large side TBPS
start the large TBPS
read the large side, write to the output
start the small runners
join them
check status
handle abort, out of memory, etc
*/
smallRG.initRow(&r);
// cout << "reading smallDL" << endl;
more = smallDL->next(smallIt, &oneRG);
/* To measure wall-time spent constructing the small-side tables...
boost::posix_time::ptime end_time, start_time =
boost::posix_time::microsec_clock::universal_time();
*/
stopMemTracking = false;
uint64_t jobs[numCores];
uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
// starting 1 thread when in PM mode, since it's only inserting into a
// vector of rows. The rest will be started when converted to UM mode.
if (joiner->inUM())
for (int i = 0; i < numCores; i++)
jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); });
else
jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); });
// wait for the first thread to join, then decide whether the others exist and need joining
jobstepThreadPool.join(jobs[0]);
if (joiner->inUM())
for (int i = 1; i < numCores; i++)
jobstepThreadPool.join(jobs[i]);
// stop the monitor thread
memTrackMutex.lock();
stopMemTracking = true;
memTrackDone.notify_one();
memTrackMutex.unlock();
jobstepThreadPool.join(memMonitor);
/* If there was an error or an abort, drain the input DL,
do endOfInput on the output */
if (cancelled())
{
// cout << "HJ stopping... status is " << status() << endl;
if (largeBPS)
largeBPS->abort();
bool more = true;
RGData oneRG;
while (more)
more = smallDLs[index]->next(smallIts[index], &oneRG);
}
/* To measure wall-time spent constructing the small-side tables...
end_time = boost::posix_time::microsec_clock::universal_time();
if (!(fSessionId & 0x80000000))
cout << "hash table construction time = " << end_time - start_time <<
" size = " << joiner->size() << endl;
*/
extendedInfo += "\n";
ostringstream oss;
if (joiner->inPM())
{
oss << "PM join (" << index << ")" << endl;
#ifdef JLF_DEBUG
cout << oss.str();
#endif
extendedInfo += oss.str();
}
else if (joiner->inUM() && !joiner->onDisk())
{
oss << "UM join (" << index << ")" << endl;
#ifdef JLF_DEBUG
cout << oss.str();
#endif
extendedInfo += oss.str();
}
/* Trying to get the extended info to match the original version
It's kind of kludgey at the moment, need to clean it up at some point */
if (!joiner->onDisk())
{
joiner->doneInserting();
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
fExtendedInfo += extendedInfo;
formatMiniStats(index);
}
}
/* Index is which small input to read. */
void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *jobs)
{
utils::setThreadName("HJSmallRunner");
bool more = true;
RGData oneRG;
Row r;
RowGroupDL* smallDL;
uint32_t smallIt;
RowGroup smallRG;
boost::shared_ptr<TupleJoiner> joiner = joiners[index];
smallDL = smallDLs[index];
smallIt = smallIts[index];
smallRG = smallRGs[index];
smallRG.initRow(&r);
try
{
/* check for join types unsupported on the PM. */
if (!largeBPS || !isExeMgr)
{
flippedUMSwitch = true;
oss << "UM join (" << index << ")";
#ifdef JLF_DEBUG
cout << oss.str() << endl;
#endif
extendedInfo += oss.str();
joiner->setInUM();
}
resourceManager->getMemory(joiner->getMemUsage(), sessionMemLimit, false);
(void)atomicops::atomicAdd(&totalUMMemoryUsage, joiner->getMemUsage());
memUsedByEachJoin[index] += joiner->getMemUsage();
ssize_t rgSize;
bool gotMem;
goto next;
while (more && !cancelled())
{
uint64_t memUseBefore, memUseAfter;
smallRG.setData(&oneRG);
if (smallRG.getRowCount() == 0)
goto next;
smallRG.getRow(0, &r);
memUseBefore = joiner->getMemUsage() + rgDataSize;
// TupleHJ owns the row memory
utils::getSpinlock(rgdLock);
rgData[index].push_back(oneRG);
rgDataSize += smallRG.getSizeWithStrings();
utils::releaseSpinlock(rgdLock);
for (i = 0; i < smallRG.getRowCount(); i++, r.nextRow())
rgSize = smallRG.getSizeWithStrings();
atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize);
gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false);
if (!gotMem)
{
//cout << "inserting " << r.toString() << endl;
joiner->insert(r);
}
memUseAfter = joiner->getMemUsage() + rgDataSize;
if (UNLIKELY(!flippedUMSwitch && (memUseAfter >= pmMemLimit)))
{
flippedUMSwitch = true;
oss << "UM join (" << index << ") ";
#ifdef JLF_DEBUG
cout << oss.str() << endl;
#endif
extendedInfo += oss.str();
joiner->setInUM();
memUseAfter = joiner->getMemUsage() + rgDataSize;
}
gotMem = resourceManager->getMemory(memUseAfter - memUseBefore, sessionMemLimit, false);
atomicops::atomicAdd(&totalUMMemoryUsage, memUseAfter - memUseBefore);
memUsedByEachJoin[index] += memUseAfter - memUseBefore;
/* This is kind of kludgy and overlaps with segreateJoiners() atm.
If this join won't be converted to disk-based, this fcn needs to abort the same as
it did before. If it will be converted, it should just return. */
if (UNLIKELY(!gotMem))
{
if (isDML || !allowDJS || (fSessionId & 0x80000000) ||
(tableOid() < 3000 && tableOid() >= 1000))
boost::unique_lock<boost::mutex> sl(saneErrMsg);
if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
(tableOid() < 3000 && tableOid() >= 1000)))
{
joinIsTooBig = true;
fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now" << endl;
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
abort();
break;
}
else
{
joiner->setConvertToDiskJoin();
return;
}
}
joiner->insertRGData(smallRG, threadID);
if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit))
{
joiner->setInUM(rgData[index]);
for (int i = 1; i < numCores; i++)
jobs[i] = jobstepThreadPool.invoke([this, i, index, jobs]
{ this->smallRunnerFcn(index, i, jobs); });
}
next:
// cout << "inserted one rg into the joiner, rowcount = " <<
// smallRG.getRowCount() << endl;
dlMutex.lock();
more = smallDL->next(smallIt, &oneRG);
dlMutex.unlock();
}
}
catch (boost::exception& e)
@@ -356,34 +458,8 @@ next:
status(logging::ERR_EXEMGR_MALFUNCTION);
}
if (!flippedUMSwitch && !cancelled())
{
oss << "PM join (" << index << ")";
#ifdef JLF_DEBUG
cout << oss.str() << endl;
#endif
extendedInfo += oss.str();
if (!joiner->inUM())
joiner->setInPM();
}
/* If there was an error or an abort drain the input DL,
do endOfInput on the output */
if (cancelled())
{
// cout << "HJ stopping... status is " << status() << endl;
if (largeBPS)
largeBPS->abort();
while (more)
more = smallDL->next(smallIt, &oneRG);
}
joiner->doneInserting();
extendedInfo += "\n";
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
fExtendedInfo += extendedInfo;
formatMiniStats(index);
}
void TupleHashJoinStep::forwardCPData()
@@ -517,24 +593,6 @@ void TupleHashJoinStep::djsReaderFcn(int index)
processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &v_rgData, &l_fe);
processDupList(0, (fe2 ? l_fe2RG : l_outputRG), &v_rgData);
#if 0
if (!v_rgData.empty() && fSessionId < 0x80000000)
{
if (fe2)
{
l_fe2RG.setData(&v_rgData[0]);
cout << "fully processed rowgroup: " << l_fe2RG.toString() << endl;
}
else
{
l_outputRG.setData(&v_rgData[0]);
cout << "fully processed rowgroup: " << l_outputRG.toString() << endl;
}
}
cout << "ownsOutputDL = " << (int) ownsOutputDL << " fDelivery = " << (int) fDelivery << endl;
#endif
sendResult(v_rgData);
}
@@ -553,6 +611,7 @@ void TupleHashJoinStep::djsReaderFcn(int index)
void TupleHashJoinStep::hjRunner()
{
uint32_t i;
std::vector<uint64_t> smallRunners; // thread handles from thread pool
if (cancelled())
{
@@ -581,7 +640,7 @@ void TupleHashJoinStep::hjRunner()
/* Start the small-side runners */
rgData.reset(new vector<RGData>[smallDLs.size()]);
memUsedByEachJoin.reset(new uint64_t[smallDLs.size()]);
memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]);
for (i = 0; i < smallDLs.size(); i++)
memUsedByEachJoin[i] = 0;
@@ -680,7 +739,7 @@ void TupleHashJoinStep::hjRunner()
{
vector<RGData> empty;
resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit);
atomicops::atomicSub(&totalUMMemoryUsage, memUsedByEachJoin[djsJoinerMap[i]]);
memUsedByEachJoin[djsJoinerMap[i]] = 0;
djs[i].loadExistingData(rgData[djsJoinerMap[i]]);
rgData[djsJoinerMap[i]].swap(empty);
}
@@ -792,8 +851,11 @@ void TupleHashJoinStep::hjRunner()
joiners.clear();
tbpsJoiners.clear();
rgData.reset();
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
totalUMMemoryUsage = 0;
for (uint i = 0; i < smallDLs.size(); i++)
{
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
memUsedByEachJoin[i] = 0;
}
}
}
}
@@ -840,7 +902,7 @@ void TupleHashJoinStep::hjRunner()
#ifdef JLF_DEBUG
cout << "moving join " << i << " to UM (PM join can't follow a UM join)\n";
#endif
tbpsJoiners[i]->setInUM();
tbpsJoiners[i]->setInUM(rgData[i]);
}
}
@@ -880,12 +942,10 @@ void TupleHashJoinStep::hjRunner()
}
#ifdef JLF_DEBUG
if (runFE2onPM)
cout << "PM runs FE2\n";
else
cout << "UM runs FE2\n";
#endif
largeBPS->setFcnExpGroup2(fe2, fe2Output, runFE2onPM);
}
@@ -971,8 +1031,11 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
joiners.clear();
rgData.reset();
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
totalUMMemoryUsage = 0;
for (uint i = 0; i < smallDLs.size(); i++)
{
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
memUsedByEachJoin[i] = 0;
}
return 0;
}
@@ -992,8 +1055,11 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
cout << " -- returning error status " << deliveredRG->getStatus() << endl;
deliveredRG->serializeRGData(bs);
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
totalUMMemoryUsage = 0;
for (uint i = 0; i < smallDLs.size(); i++)
{
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
memUsedByEachJoin[i] = 0;
}
return 0;
}
@@ -1850,7 +1916,8 @@ void TupleHashJoinStep::segregateJoiners()
return;
}
/* Debugging code, this makes all eligible joins disk-based.
#if 0
// Debugging code, this makes all eligible joins disk-based.
else {
cout << "making all joins disk-based" << endl;
for (i = 0; i < smallSideCount; i++) {
@@ -1860,7 +1927,7 @@ void TupleHashJoinStep::segregateJoiners()
}
return;
}
*/
#endif
/* For now if there is no largeBPS all joins need to either be DJS or not, not mixed */
if (!largeBPS)

View File

@@ -75,6 +75,8 @@ public:
void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1)
{
fTableOID1 = tableOid1;
if (fTableOID1 < 3000)
numCores = 1; // syscat query, no need for more than 1 thread
}
void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2)
{
@@ -425,7 +427,6 @@ private:
std::vector<std::vector<uint32_t> > smallSideKeys;
ResourceManager* resourceManager;
volatile uint64_t totalUMMemoryUsage;
struct JoinerSorter
{
@@ -436,16 +437,14 @@ private:
}
};
std::vector<boost::shared_ptr<joiner::TupleJoiner> > joiners;
boost::scoped_array<std::vector<rowgroup::RGData> > rgData;
TupleBPS* largeBPS;
rowgroup::RowGroup largeRG, outputRG;
std::vector<rowgroup::RowGroup> smallRGs;
uint64_t pmMemLimit;
uint64_t rgDataSize;
ssize_t pmMemLimit;
void hjRunner();
void smallRunnerFcn(uint32_t index);
void smallRunnerFcn(uint32_t index, uint threadID, uint64_t *threads);
struct HJRunner
{
@@ -462,15 +461,13 @@ private:
SmallRunner(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i) { }
void operator()()
{
utils::setThreadName("HJSSmallSide");
HJ->smallRunnerFcn(index);
HJ->startSmallRunners(index);
}
TupleHashJoinStep* HJ;
uint32_t index;
};
int64_t mainRunner; // thread handle from thread pool
std::vector<uint64_t> smallRunners; // thread handles from thread pool
// for notify TupleAggregateStep PM hashjoin
// Ideally, hashjoin and delivery communicate with RowGroupDL,
@@ -614,10 +611,19 @@ private:
std::vector<boost::shared_ptr<joiner::TupleJoiner> > tbpsJoiners;
std::vector<boost::shared_ptr<joiner::TupleJoiner> > djsJoiners;
std::vector<int> djsJoinerMap;
boost::scoped_array<uint64_t> memUsedByEachJoin;
boost::scoped_array<ssize_t> memUsedByEachJoin;
boost::mutex djsLock;
boost::shared_ptr<int64_t> sessionMemLimit;
/* Threaded UM join support */
int numCores;
boost::mutex dlMutex, memTrackMutex, saneErrMsg;
boost::condition memTrackDone;
std::atomic<bool> rgdLock;
bool stopMemTracking;
void trackMem(uint index);
void startSmallRunners(uint index);
friend class DiskJoinStep;
};

View File

@@ -1647,8 +1647,8 @@ void WindowFunctionStep::sort(std::vector<RowPosition>::iterator v, uint64_t n)
}
}
sort(v, distance(v, h) + 1);
sort(l, distance(l, v) + n);
sort(v, std::distance(v, h) + 1);
sort(l, std::distance(l, v) + n);
}

View File

@@ -8,6 +8,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
SET ( libcalmysql_SRCS
ha_mcs_sysvars.cpp
ha_mcs_client_udfs.cpp
ha_mcs_opt_rewrites.cpp
ha_mcs_pushdown.cpp
ha_mcs.cpp
ha_mcs_impl.cpp
@@ -22,96 +23,37 @@ SET ( libcalmysql_SRCS
ha_view.cpp sm.cpp
ha_window_function.cpp
ha_mcs_partition.cpp
ha_pseudocolumn.cpp)
ha_pseudocolumn.cpp
is_columnstore_tables.cpp
is_columnstore_columns.cpp
is_columnstore_files.cpp
is_columnstore_extents.cpp)
add_definitions(-DMYSQL_DYNAMIC_PLUGIN)
set_source_files_properties(ha_mcs.cpp PROPERTIES COMPILE_FLAGS "-fno-implicit-templates")
add_library(calmysql SHARED ${libcalmysql_SRCS})
if (COMMAND mysql_add_plugin)
mysql_add_plugin(columnstore ${libcalmysql_SRCS} STORAGE_ENGINE MODULE_ONLY DEFAULT
LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} threadpool
COMPONENT columnstore-engine)
else ()
add_library(ha_columnstore SHARED ${libcalmysql_SRCS})
SET_TARGET_PROPERTIES(ha_columnstore PROPERTIES PREFIX "")
target_link_libraries(calmysql ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
target_link_libraries(ha_columnstore ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
set_target_properties(calmysql PROPERTIES VERSION 1.0.0 SOVERSION 1)
SET ( is_columnstore_tables_SRCS
is_columnstore_tables.cpp
sm.cpp
)
add_library(is_columnstore_tables SHARED ${is_columnstore_tables_SRCS})
target_link_libraries(is_columnstore_tables ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
# Don't prepend .so file with 'lib'
set_target_properties(is_columnstore_tables PROPERTIES PREFIX "")
set_target_properties(is_columnstore_tables PROPERTIES VERSION 1.0.0 SOVERSION 1)
SET ( is_columnstore_columns_SRCS
is_columnstore_columns.cpp
sm.cpp
)
add_library(is_columnstore_columns SHARED ${is_columnstore_columns_SRCS})
target_link_libraries(is_columnstore_columns ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
# Don't prepend .so file with 'lib'
set_target_properties(is_columnstore_columns PROPERTIES PREFIX "")
set_target_properties(is_columnstore_columns PROPERTIES VERSION 1.0.0 SOVERSION 1)
SET ( is_columnstore_extents_SRCS
is_columnstore_extents.cpp
sm.cpp
)
add_library(is_columnstore_extents SHARED ${is_columnstore_extents_SRCS})
target_link_libraries(is_columnstore_extents ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
# Don't prepend .so file with 'lib'
set_target_properties(is_columnstore_extents PROPERTIES PREFIX "")
set_target_properties(is_columnstore_extents PROPERTIES VERSION 1.0.0 SOVERSION 1)
SET ( is_columnstore_files_SRCS
is_columnstore_files.cpp
sm.cpp
)
add_library(is_columnstore_files SHARED ${is_columnstore_files_SRCS})
target_link_libraries(is_columnstore_files ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
# Don't prepend .so file with 'lib'
set_target_properties(is_columnstore_files PROPERTIES PREFIX "")
set_target_properties(is_columnstore_files PROPERTIES VERSION 1.0.0 SOVERSION 1)
install(TARGETS calmysql is_columnstore_tables is_columnstore_columns is_columnstore_extents is_columnstore_files DESTINATION ${MARIADB_PLUGINDIR} COMPONENT storage-engine)
install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine)
endif ()
install(FILES syscatalog_mysql.sql
dumpcat_mysql.sql
calsetuserpriority.sql
calremoveuserpriority.sql
calshowprocesslist.sql
columnstore_info.sql
DESTINATION ${ENGINE_SUPPORTDIR} COMPONENT storage-engine)
DESTINATION ${ENGINE_SUPPORTDIR} COMPONENT columnstore-engine)
install(PROGRAMS install_mcs_mysql.sh mysql-Columnstore
DESTINATION ${ENGINE_SBINDIR} COMPONENT storage-engine)
DESTINATION ${ENGINE_SBINDIR} COMPONENT columnstore-engine)
install(FILES columnstore.cnf
DESTINATION ${MARIADB_MYCNFDIR} COMPONENT storage-engine)
#AM_CPPFLAGS = $(idb_common_includes) $(idb_cppflags)
#AM_CFLAGS = $(idb_cflags)
#AM_CXXFLAGS = $(idb_cxxflags)
#AM_LDFLAGS = $(idb_ldflags)
#lib_LTLIBRARIES = libcalmysql.la
#libcalmysql_la_SOURCES = ha_mcs.cpp ha_mcs_impl.cpp ha_mcs_dml.cpp ha_mcs_ddl.cpp ha_mcs_execplan.cpp ha_scalar_sub.cpp ha_in_sub.cpp ha_exists_sub.cpp ha_from_sub.cpp ha_select_sub.cpp ha_view.cpp sm.cpp ha_window_function.cpp ha_mcs_partition.cpp ha_pseudocolumn.cpp
#libcalmysql_la_LDFLAGS = -version-info 1:0:0 $(idb_common_ldflags) $(idb_common_libs) $(idb_write_libs) $(AM_LDFLAGS)
#libcalmysql_la_CPPFLAGS = -I/usr/include/libxml2 -I../../../mysql/include -I../../../mysql/sql -I../../../mysql/regex -DMYSQL_DYNAMIC_PLUGIN $(AM_CPPFLAGS)
#include_HEADERS = idb_mysql.h
#
#dist_mysql_DATA = syscatalog_mysql.sql dumpcat_mysql.sql calsetuserpriority.sql calremoveuserpriority.sql calshowprocesslist.sql my.cnf
#dist_mysql_SCRIPTS = install_mcs_mysql.sh mysql-Columnstore dumpcat.pl
#
#libcalmysql_la-ha_mcs.lo: ha_mcs.cpp
# if $(LIBTOOL) --tag=CXX --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libcalmysql_la_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -fno-rtti -fno-implicit-templates -MT libcalmysql_la-ha_mcs.lo -MD -MP -MF "$(DEPDIR)/libcalmysql_la-ha_mcs.Tpo" -c -o libcalmysql_la-ha_mcs.lo `test -f 'ha_mcs.cpp' || echo '$(srcdir)/'`ha_mcs.cpp; \
# then mv -f "$(DEPDIR)/libcalmysql_la-ha_mcs.Tpo" "$(DEPDIR)/libcalmysql_la-ha_mcs.Plo"; else rm -f "$(DEPDIR)/libcalmysql_la-ha_mcs.Tpo"; exit 1; fi
DESTINATION ${MARIADB_MYCNFDIR} COMPONENT columnstore-engine)

View File

@@ -39,8 +39,4 @@ server-id = 1
lower_case_table_names=1
plugin-load-add=libcalmysql.so
plugin-load-add=is_columnstore_tables.so
plugin-load-add=is_columnstore_columns.so
plugin-load-add=is_columnstore_extents.so
plugin-load-add=is_columnstore_files.so
plugin-load-add=ha_columnstore.so

View File

@@ -326,8 +326,7 @@ FromSubQuery::FromSubQuery(gp_walk_info& gwip,
SELECT_LEX* sub,
bool isPushdownHandler) :
SubQuery(gwip),
fFromSub(sub),
fPushdownHand(isPushdownHandler)
fFromSub(sub)
{}
FromSubQuery::~FromSubQuery()
@@ -349,9 +348,7 @@ SCSEP FromSubQuery::transform()
csep->derivedTbAlias(fAlias); // always lower case
csep->derivedTbView(fGwip.viewName.alias);
// DRRTUY isUnion - false. fPushdownHand could be safely set to true
// b/c only pushdowns get here.
if (getSelectPlan(gwi, *fFromSub, csep, false, fPushdownHand) != 0)
if (getSelectPlan(gwi, *fFromSub, csep, false, true) != 0)
{
fGwip.fatalParseError = true;

View File

@@ -23,6 +23,7 @@
#include "ha_mcs_pushdown.h"
#define NEED_CALPONT_EXTERNS
#include "ha_mcs_impl.h"
#include "is_columnstore.h"
static handler* calpont_create_handler(handlerton* hton,
TABLE_SHARE* table,
@@ -911,6 +912,10 @@ const COND* ha_mcs::cond_push(const COND* cond)
struct st_mysql_storage_engine columnstore_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
static struct st_mysql_information_schema is_columnstore_plugin_version =
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
mysql_declare_plugin(columnstore)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
@@ -943,6 +948,70 @@ maria_declare_plugin(columnstore)
mcs_system_variables, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_plugin_version,
"COLUMNSTORE_COLUMNS",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore columns",
PLUGIN_LICENSE_GPL,
is_columnstore_columns_plugin_init,
//is_columnstore_tables_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_plugin_version,
"COLUMNSTORE_TABLES",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore tables",
PLUGIN_LICENSE_GPL,
is_columnstore_tables_plugin_init,
//is_columnstore_tables_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_plugin_version,
"COLUMNSTORE_FILES",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore filess",
PLUGIN_LICENSE_GPL,
is_columnstore_files_plugin_init,
//is_columnstore_files_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_plugin_version,
"COLUMNSTORE_EXTENTS",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore extents",
PLUGIN_LICENSE_GPL,
is_columnstore_extents_plugin_init,
//is_columnstore_extents_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;

View File

@@ -1,5 +1,6 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License

View File

@@ -16,11 +16,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*
* $Id: ha_mcs_execplan.cpp 9749 2013-08-15 04:00:39Z zzhu $
*/
/** @file */
//#define DEBUG_WALK_COND
#include <my_config.h>
#include <string>
@@ -61,7 +56,6 @@ using namespace logging;
#include "ha_mcs_impl_if.h"
#include "ha_mcs_sysvars.h"
#include "ha_subquery.h"
//#include "ha_view.h"
using namespace cal_impl_if;
#include "calpontselectexecutionplan.h"
@@ -132,7 +126,6 @@ public:
gp_walk_info* fgwip;
};
//#define OUTER_JOIN_DEBUG
namespace
{
string lower(string str)
@@ -1321,6 +1314,7 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex)
if (table_ptr->outer_join && table_ptr->on_expr)
{
// inner tables block
Item_cond* expr = reinterpret_cast<Item_cond*>(table_ptr->on_expr);
gwi_outer.innerTables.insert(tan);
@@ -1392,8 +1386,10 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex)
#endif
expr->traverse_cond(gp_walk, &gwi_outer, Item::POSTFIX);
}
// *DRRTUY This part is to be removed in 1.4.3
#if 0
// @bug 2849
else if (table_ptr->embedding && table_ptr->embedding->nested_join)
/*else if (table_ptr->embedding && table_ptr->embedding->nested_join)
{
// if this is dervied table process phase, mysql may have not developed the plan
// completely. Return and let it finish. It will come to rnd_init again.
@@ -1434,8 +1430,8 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex)
}
}
}
}
} */
#endif
// Error out subquery in outer join on filter for now
if (gwi_outer.hasSubSelect)
{
@@ -1444,9 +1440,8 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex)
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText);
return -1;
}
// build outerjoinon filter
ParseTree* filters = NULL, *ptp = NULL, /**rhs = NULL*/*lhs = NULL;
ParseTree* filters = NULL, *ptp = NULL, *lhs = NULL;
while (!gwi_outer.ptWorkStack.empty())
{
@@ -3173,6 +3168,15 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp
}
case STRING_RESULT:
{
// Special handling for 0xHHHH literals
const Type_handler *tph = item->type_handler();
if (typeid(*tph) == typeid(Type_handler_hex_hybrid))
{
Item_hex_hybrid *hip = reinterpret_cast<Item_hex_hybrid*>(const_cast<Item*>(item));
rc = new ConstantColumn((int64_t)hip->val_int(), ConstantColumn::NUM);
break;
}
String val, *str = item->val_str(&val);
string valStr;
valStr.assign(str->ptr(), str->length());
@@ -3680,6 +3684,29 @@ ReturnedColumn* buildFunctionColumn(
fc = buildCaseFunction(ifp, gwi, nonSupport);
}
else if ((funcName == "charset" || funcName == "collation") &&
ifp->argument_count() == 1 &&
ifp->arguments()[0]->type() == Item::FIELD_ITEM)
{
Item_field *item = reinterpret_cast<Item_field*>(ifp->arguments()[0]);
CHARSET_INFO* info = item->charset_for_protocol();
ReturnedColumn* rc;
string val;
if (funcName == "charset")
{
val = info->csname;
}
else // collation
{
val = info->name;
}
rc = new ConstantColumn(val, ConstantColumn::LITERAL);
return rc;
}
else if ((functor = funcExp->getFunctor(funcName)))
{
// where clause isnull still treated as predicate operator
@@ -6064,69 +6091,15 @@ bool isMCSTable(TABLE* table_ptr)
return false;
}
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
SCSEP& csep,
bool isUnion,
bool isPushdownHand)
/*@brief set some runtime params to run the query */
/***********************************************************
* DESCRIPTION:
* This function just sets a number of runtime params that
* limits resource consumed.
***********************************************************/
void setExecutionParams(gp_walk_info &gwi, SCSEP &csep)
{
#ifdef DEBUG_WALK_COND
cerr << "getSelectPlan()" << endl;
#endif
// by pass the derived table resolve phase of mysql
if ( !isPushdownHand &&
!(((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE ) ||
((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI ) ) && gwi.thd->derived_tables_processing)
{
// MCOL-2178 isUnion member only assigned, never used
//MIGR::infinidb_vtable.isUnion = false;
return -1;
}
// rollup is currently not supported
if (select_lex.olap == ROLLUP_TYPE)
{
gwi.fatalParseError = true;
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_ROLLUP_NOT_SUPPORT);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
gwi.internalDecimalScale = (get_use_decimal_scale(gwi.thd) ? get_decimal_scale(gwi.thd) : -1);
gwi.subSelectType = csep->subType();
JOIN* join = select_lex.join;
Item_cond* icp = 0;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
// if icp is null, try to find the where clause other where
if (!join && gwi.thd->lex->derived_tables)
{
if (select_lex.prep_where)
icp = (Item_cond*)(select_lex.prep_where);
else if (select_lex.where)
icp = (Item_cond*)(select_lex.where);
}
else if (!join && ( ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE ) ||
((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI )))
{
icp = reinterpret_cast<Item_cond*>(select_lex.where);
}
uint32_t sessionID = csep->sessionID();
gwi.sessionid = sessionID;
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
csep->timeZone(gwi.thd->variables.time_zone->get_name()->ptr());
gwi.csc = csc;
// @bug 2123. Override large table estimate if infinidb_ordered hint was used.
// @bug 2404. Always override if the infinidb_ordered_only variable is turned on.
if (get_ordered_only(gwi.thd))
@@ -6148,13 +6121,30 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
csep->umMemLimit(numeric_limits<int64_t>::max());
else
csep->umMemLimit(get_um_mem_limit(gwi.thd) * 1024ULL * 1024);
}
/*@brief Process FROM part of the query or sub-query */
/***********************************************************
* DESCRIPTION:
* This function processes elements of List<TABLE_LIST> in
* FROM part of the query.
* isUnion tells that CS processes FROM taken from UNION UNIT.
* The notion is described in MDB code.
* on_expr_list ON expressions used in OUTER JOINs. These are
* later used in processWhere()
* RETURNS
* error id as an int
***********************************************************/
int processFrom(bool &isUnion,
SELECT_LEX &select_lex,
gp_walk_info &gwi,
SCSEP &csep,
List<Item> &on_expr_list)
{
// populate table map and trigger syscolumn cache for all the tables (@bug 1637).
// all tables on FROM list must have at least one col in colmap
TABLE_LIST* table_ptr = select_lex.get_table_list();
CalpontSelectExecutionPlan::SelectList derivedTbList;
// DEBUG
#ifdef DEBUG_WALK_COND
List_iterator<TABLE_LIST> sj_list_it(select_lex.sj_nests);
TABLE_LIST* sj_nest;
@@ -6163,12 +6153,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
{
cerr << sj_nest->db.str << "." << sj_nest->table_name.str << endl;
}
#endif
// @bug 1796. Remember table order on the FROM list.
gwi.clauseType = FROM;
try
{
for (; table_ptr; table_ptr = table_ptr->next_local)
@@ -6182,17 +6168,20 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
// Save on_expr to use it for WHERE processing
if (!table_ptr->outer_join && table_ptr->on_expr)
{
on_expr_list.push_back(table_ptr->on_expr);
}
string viewName = getViewName(table_ptr);
// @todo process from subquery
if (table_ptr->derived)
{
String str;
(table_ptr->derived->first_select())->print(gwi.thd, &str, QT_ORDINARY);
SELECT_LEX* select_cursor = table_ptr->derived->first_select();
FromSubQuery fromSub(gwi, select_cursor, isPushdownHand);
FromSubQuery fromSub(gwi, select_cursor, true);
string alias(table_ptr->alias.str);
fromSub.alias(lower(alias));
@@ -6203,7 +6192,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
if (!plan)
{
setError(gwi.thd, ER_INTERNAL_ERROR, fromSub.gwip().parseErrorText, gwi);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::removeCalpontSystemCatalog(gwi.sessionid);
return ER_INTERNAL_ERROR;
}
@@ -6229,7 +6218,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
// trigger system catalog cache
if (columnStore)
csc->columnRIDs(make_table(table_ptr->db.str, table_ptr->table_name.str), true);
gwi.csc->columnRIDs(make_table(table_ptr->db.str, table_ptr->table_name.str), true);
string table_name = table_ptr->table_name.str;
@@ -6256,7 +6245,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
catch (IDBExcept& ie)
{
setError(gwi.thd, ER_INTERNAL_ERROR, ie.what(), gwi);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::removeCalpontSystemCatalog(gwi.sessionid);
// @bug 3852. set error status for gwi.
gwi.fatalParseError = true;
gwi.parseErrorText = ie.what();
@@ -6269,14 +6258,14 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
gwi.fatalParseError = true;
gwi.parseErrorText = emsg;
setError(gwi.thd, ER_INTERNAL_ERROR, emsg, gwi);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::removeCalpontSystemCatalog(gwi.sessionid);
return ER_INTERNAL_ERROR;
}
csep->tableList(gwi.tbList);
// Send this recursively to getSelectPlan
bool unionSel = false;
// UNION master unit check
// Existed pushdown handlers won't get in this scope
// except UNION pushdown that is to come.
@@ -6299,25 +6288,12 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
plan->traceFlags(csep->traceFlags());
plan->data(csep->data());
// @bug 3853. When one or more sides or union queries contain derived tables,
// sl->join->zero_result_cause is not trustable. Since we've already handled
// constant filter now (0/1), we can relax the following checking.
// @bug 2547. ignore union unit of zero result set case
// if (sl->join)
// {
// sl->join->optimize();
// @bug 3067. not clear MySQL's behavior. when in subquery, this variable
// is not trustable.
// if (sl->join->zero_result_cause && !gwi.subQuery)
// continue;
// }
// gwi for the union unit
gp_walk_info union_gwi;
union_gwi.thd = gwi.thd;
uint32_t err = 0;
if ((err = getSelectPlan(union_gwi, *sl, plan, unionSel, isPushdownHand)) != 0)
if ((err = getSelectPlan(union_gwi, *sl, plan, unionSel, true)) != 0)
return err;
unionVec.push_back(SCEP(plan));
@@ -6325,63 +6301,67 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
// distinct union num
if (sl == select_lex.master_unit()->union_distinct)
distUnionNum = unionVec.size();
/*#ifdef DEBUG_WALK_COND
IDEBUG( cerr << ">>>> UNION DEBUG" << endl );
JOIN* join = sl->join;
Item_cond* icp = 0;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
if (icp)
icp->traverse_cond(debug_walk, &gwi, Item::POSTFIX);
IDEBUG ( cerr << *plan << endl );
IDEBUG ( cerr << "<<<<UNION DEBUG" << endl );
#endif*/
}
csep->unionVec(unionVec);
csep->distinctUnionNum(distUnionNum);
}
gwi.clauseType = WHERE;
return 0;
}
/*@brief Process WHERE part of the query or sub-query */
/***********************************************************
* DESCRIPTION:
* This function processes conditions from either JOIN->conds
* or SELECT_LEX->where|prep_where
* on_expr_list ON expressions used in OUTER JOINs. These are
* populated used in processFrom()
* RETURNS
* error id as an int
***********************************************************/
int processWhere(SELECT_LEX &select_lex,
gp_walk_info &gwi,
SCSEP &csep,
List<Item> &on_expr_list)
{
JOIN* join = select_lex.join;
Item_cond* icp = 0;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
// if icp is null, try to find the where clause other where
if (!join && gwi.thd->lex->derived_tables)
{
if (select_lex.prep_where)
icp = (Item_cond*)(select_lex.prep_where);
else if (select_lex.where)
icp = (Item_cond*)(select_lex.where);
}
else if (!join && ( ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE ) ||
((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI )))
{
icp = reinterpret_cast<Item_cond*>(select_lex.where);
}
if (icp)
{
// MariaDB bug 624 - without the fix_fields call, delete with join may error with "No query step".
//#if MYSQL_VERSION_ID < 50172
// MariaDB bug 624 - without the fix_fields call, delete with join may error with "No query step".
//@bug 3039. fix fields for constants
if (!icp->is_fixed())
{
icp->fix_fields(gwi.thd, (Item**)&icp);
}
//#endif
gwi.fatalParseError = false;
#ifdef DEBUG_WALK_COND
cerr << "------------------ WHERE -----------------------" << endl;
std::cerr << "------------------ WHERE -----------------------" << std::endl;
icp->traverse_cond(debug_walk, &gwi, Item::POSTFIX);
if (join && join->cond_equal)
{
List_iterator<Item_equal> li(join->cond_equal->current_level);
Item_equal *cur_item_eq;
while ((cur_item_eq= li++))
{
// DRRTUY TODO replace the block with
//cur_item_eq->traverse_cond(debug_walk, gwip, Item::POSTFIX);
std::cerr << "item_equal(";
Item *item;
Item_equal_fields_iterator it(*cur_item_eq);
while ((item= it++))
{
std::ostringstream ostream;
std::ostringstream& osr = ostream;
getColNameFromItem(osr, item);
std::cerr << osr.str() << ",";
}
std::cerr << ")" << std::endl;
}
}
cerr << "------------------------------------------------\n" << endl;
std::cerr << "------------------------------------------------\n" << std::endl;
#endif
icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX);
@@ -6410,6 +6390,26 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
(dynamic_cast<ConstantColumn*>(gwi.rcWorkStack.top()))->timeZone(gwi.thd->variables.time_zone->get_name()->ptr());
}
#ifdef DEBUG_WALK_COND
std::cerr << "------------------ ON_EXPR -----------------------" << endl;
#endif
// MCOL-3593 MDB now doesn't rewrite and/or consolidate ON and WHERE expressions
// and CS handles INNER ON expressions here.
if (!on_expr_list.is_empty())
{
List_iterator<Item> on_expr_it(on_expr_list);
Item_cond *on_expr = NULL;
while((on_expr = reinterpret_cast<Item_cond*>(on_expr_it++)))
{
on_expr->traverse_cond(gp_walk, &gwi, Item::POSTFIX);
#ifdef DEBUG_WALK_COND
on_expr->traverse_cond(debug_walk, &gwi, Item::POSTFIX);
#endif
}
}
#ifdef DEBUG_WALK_COND
std::cerr << "-------------------------------------------------\n" << std::endl;
#endif
// ZZ - the followinig debug shows the structure of nested outer join. should
// use a recursive function.
@@ -6500,7 +6500,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
// ptWorkStack empty, the item is in rcWorkStack.
// MySQL 5.6 (MariaDB?). when icp is null and zero_result_cause is set, a constant 0
// is pushed to rcWorkStack.
if (/*icp && */gwi.ptWorkStack.empty() && !gwi.rcWorkStack.empty())
if (gwi.ptWorkStack.empty() && !gwi.rcWorkStack.empty())
{
filters = new ParseTree(gwi.rcWorkStack.top());
gwi.rcWorkStack.pop();
@@ -6515,11 +6515,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
break;
ptp = new ParseTree(new LogicOperator("and"));
//ptp->left(filters);
ptp->right(filters);
lhs = gwi.ptWorkStack.top();
gwi.ptWorkStack.pop();
//ptp->right(rhs);
ptp->left(lhs);
gwi.ptWorkStack.push(ptp);
}
@@ -6532,6 +6530,68 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
filters->drawTree(aTmpDir);
}
return 0;
}
/*@brief Translates SELECT_LEX into CSEP */
/***********************************************************
* DESCRIPTION:
* This function takes SELECT_LEX and tries to produce
* a corresponding CSEP out of it. It is made of parts that
* process parts of the query, e.g. FROM, WHERE, SELECT,
* HAVING, GROUP BY, ORDER BY. FROM and WHERE are processed
* by processFrom(), processWhere(). CS calls getSelectPlan()
* recursively to process subqueries.
* ARGS
* isUnion if true CS processes UNION unit now
* isPushdownHand legacy to be removed
* RETURNS
* error id as an int
***********************************************************/
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
SCSEP& csep,
bool isUnion,
bool isPushdownHand)
{
#ifdef DEBUG_WALK_COND
cerr << "getSelectPlan()" << endl;
#endif
int rc = 0;
// rollup is currently not supported
if (select_lex.olap == ROLLUP_TYPE)
{
gwi.fatalParseError = true;
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_ROLLUP_NOT_SUPPORT);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
setExecutionParams(gwi, csep);
gwi.subSelectType = csep->subType();
uint32_t sessionID = csep->sessionID();
gwi.sessionid = sessionID;
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
csep->timeZone(gwi.thd->variables.time_zone->get_name()->ptr());
gwi.csc = csc;
CalpontSelectExecutionPlan::SelectList derivedTbList;
// @bug 1796. Remember table order on the FROM list.
gwi.clauseType = FROM;
List<Item> on_expr_list;
if ((rc = processFrom(isUnion, select_lex, gwi, csep, on_expr_list)))
{
return rc;
}
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
gwi.clauseType = WHERE;
if ((rc = processWhere(select_lex, gwi, csep, on_expr_list)))
{
return rc;
}
gwi.clauseType = SELECT;
#ifdef DEBUG_WALK_COND
{
@@ -7084,7 +7144,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
for (uint32_t i = 0; i < funcFieldVec.size(); i++)
{
//SimpleColumn *sc = new SimpleColumn(funcFieldVec[i]->db_name, bestTableName(funcFieldVec[i])/*funcFieldVec[i]->table_name*/, funcFieldVec[i]->field_name, sessionID);
SimpleColumn* sc = buildSimpleColumn(funcFieldVec[i], gwi);
if (!sc || gwi.fatalParseError)
@@ -7110,7 +7169,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
String str;
funcFieldVec[i]->print(&str, QT_ORDINARY);
sc->alias(string(str.c_ptr()));
//sc->tableAlias(funcFieldVec[i]->table_name);
sc->tableAlias(sc->tableAlias());
SRCP srcp(sc);
uint32_t j = 0;
@@ -7867,8 +7925,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
{
uint32_t limitOffset = 0;
if (join)
if (select_lex.join)
{
JOIN* join = select_lex.join;
#if MYSQL_VERSION_ID >= 50172
// @bug5729. After upgrade, join->unit sometimes is uninitialized pointer

View File

@@ -413,6 +413,23 @@ int vbin2hex(const uint8_t* p, const unsigned l, char* o)
return 0;
}
// Table Map is used by both cond_push and table mode processing
// Entries made by cond_push don't have csep though.
// When
bool onlyOneTableinTM(cal_impl_if::cal_connection_info* ci)
{
size_t counter = 0;
for (auto &tableMapEntry: ci->tableMap)
{
if (tableMapEntry.second.csep)
counter++;
if (counter >= 1)
return false;
}
return true;
}
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag = false)
{
int rc = HA_ERR_END_OF_FILE;
@@ -2441,7 +2458,9 @@ int ha_mcs_impl_rnd_init(TABLE* table)
csep = ti.csep;
// for ExeMgr logging sqltext. only log once for the query although multi plans may be sent
if (ci->tableMap.size() == 1)
// CS adds the ti into TM in the end of rnd_init thus we log the SQL
// only once when there is no ti with csep.
if (onlyOneTableinTM(ci))
{
ti.csep->data(idb_mysql_query_str(thd));
}
@@ -3251,7 +3270,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
aCmdLine = aCmdLine + table->s->db.str + " " + table->s->table_name.str ;
//cout << "aCmdLine = " << aCmdLine << endl;
std::istringstream ss(aCmdLine);
std::string arg;
std::vector<std::string> v2(20, "");
@@ -3278,19 +3296,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
saAttr.lpSecurityDescriptor = NULL;
HANDLE handleList[2];
const char* pSectionMsg;
// Create a pipe for the child process's STDOUT.
#if 0 // We don't need stdout to come back right now.
pSectionMsg = "Create Stdout";
bSuccess = CreatePipe(&ci->cpimport_stdout_Rd, &ci->cpimport_stdout_Wr, &saAttr, 0);
// Ensure the read handle to the pipe for STDIN is not inherited.
if (bSuccess)
{
pSectionMsg = "SetHandleInformation(stdout)";
bSuccess = SetHandleInformation(ci->cpimport_stdout_Rd, HANDLE_FLAG_INHERIT, 0);
}
#endif
bSuccess = true;
// Create a pipe for the child process's STDIN.
@@ -3340,10 +3345,8 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
pSectionMsg = "UpdateProcThreadAttribute";
bInitialized = true;
handleList[0] = ci->cpimport_stdin_Rd;
// handleList[1] = ci->cpimport_stdout_Wr;
bSuccess = UpdateProcThreadAttribute(lpAttributeList,
0, PROC_THREAD_ATTRIBUTE_HANDLE_LIST,
// handleList, 2*sizeof(HANDLE), NULL, NULL);
handleList, sizeof(HANDLE), NULL, NULL);
}
@@ -3365,8 +3368,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
siStartInfo.lpAttributeList = lpAttributeList;
siStartInfo.StartupInfo.hStdError = NULL;
siStartInfo.StartupInfo.hStdOutput = NULL;
// siStartInfo.StartupInfo.hStdError = ci->cpimport_stdout_Wr;
// siStartInfo.StartupInfo.hStdOutput = ci->cpimport_stdout_Wr;
siStartInfo.StartupInfo.hStdInput = ci->cpimport_stdin_Rd;
siStartInfo.StartupInfo.dwFlags |= STARTF_USESTDHANDLES;
// Create the child process.
@@ -3485,14 +3486,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
{
ci->filePtr = fdopen(ci->fdt[1], "w");
ci->cpimport_pid = aChPid; // This is the child PID
//cout << "Child PID is " << aChPid << endl;
close(ci->fdt[0]); //close the READER of PARENT
ci->fdt[0] = -1;
// now we can send all the data thru FIFO[1], writer of PARENT
}
//if(aChPid == 0)
//cout << "******** Child finished its work ********" << endl;
#endif
}
else
@@ -3500,7 +3498,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
if (!ci->dmlProc)
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "start_bulk_insert starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
}
}
@@ -3616,22 +3613,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
// @bug 2515. Check command intead of vtable state
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
{
//@Bug 2438. Only load data infile calls last batch process
/* if ( ci->isLoaddataInfile && ((thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) || (ci->useCpimport == 0))) {
//@Bug 2829 Handle ctrl-C
if ( thd->killed > 0 )
abort = true;
if ( !ci->dmlProc )
{
ci->dmlProc = new MessageQueueClient("DMLProc");
//cout << "end_bulk_insert starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
}
rc = ha_mcs_impl_write_last_batch(table, *ci, abort);
}
else if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) ||
} */
if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) ||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) )
@@ -4076,9 +4057,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type)
CalTableMap::iterator mapiter = ci->tableMap.find(table);
// make sure this is a release lock (2nd) call called in
// the table mode.
if (mapiter != ci->tableMap.end()
&& (mapiter->second.condInfo && mapiter->second.csep)
&& lock_type == 2)
if (mapiter != ci->tableMap.end() && mapiter->second.csep && lock_type == 2)
{
// table mode
if (mapiter->second.conn_hndl)

View File

@@ -189,7 +189,6 @@ struct cal_table_info
enum RowSources { FROM_ENGINE, FROM_FILE };
cal_table_info() : tpl_ctx(0),
//tpl_scan_ctx(0),
c(0),
msTablePtr(0),
conn_hndl(0),

View File

@@ -0,0 +1,245 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "ha_mcs_opt_rewrites.h"
// Search simplify_joins() function in the server's code for detail
COND *
simplify_joins_(JOIN *join, List<TABLE_LIST> *join_list, COND *conds, bool top,
bool in_sj)
{
TABLE_LIST *table;
NESTED_JOIN *nested_join;
TABLE_LIST *prev_table= 0;
List_iterator<TABLE_LIST> li(*join_list);
bool straight_join= MY_TEST(join->select_options & SELECT_STRAIGHT_JOIN);
DBUG_ENTER("simplify_joins");
/*
Try to simplify join operations from join_list.
The most outer join operation is checked for conversion first.
*/
while ((table= li++))
{
table_map used_tables;
table_map not_null_tables= (table_map) 0;
if ((nested_join= table->nested_join))
{
/*
If the element of join_list is a nested join apply
the procedure to its nested join list first.
*/
if (table->on_expr)
{
Item *expr= table->on_expr;
/*
If an on expression E is attached to the table,
check all null rejected predicates in this expression.
If such a predicate over an attribute belonging to
an inner table of an embedded outer join is found,
the outer join is converted to an inner join and
the corresponding on expression is added to E.
*/
expr= simplify_joins_(join, &nested_join->join_list,
expr, FALSE, in_sj || table->sj_on_expr);
if (!table->prep_on_expr || expr != table->on_expr)
{
DBUG_ASSERT(expr);
table->on_expr= expr;
table->prep_on_expr= expr->copy_andor_structure(join->thd);
}
}
nested_join->used_tables= (table_map) 0;
nested_join->not_null_tables=(table_map) 0;
conds= simplify_joins_(join, &nested_join->join_list, conds, top,
in_sj || table->sj_on_expr);
used_tables= nested_join->used_tables;
not_null_tables= nested_join->not_null_tables;
/* The following two might become unequal after table elimination: */
nested_join->n_tables= nested_join->join_list.elements;
}
else
{
if (!table->prep_on_expr)
table->prep_on_expr= table->on_expr;
used_tables= table->get_map();
if (conds)
not_null_tables= conds->not_null_tables();
}
if (table->embedding)
{
table->embedding->nested_join->used_tables|= used_tables;
table->embedding->nested_join->not_null_tables|= not_null_tables;
}
if (!(table->outer_join & (JOIN_TYPE_LEFT | JOIN_TYPE_RIGHT)) ||
(used_tables & not_null_tables))
{
/*
For some of the inner tables there are conjunctive predicates
that reject nulls => the outer join can be replaced by an inner join.
*/
if (table->outer_join && !table->embedding && table->table)
table->table->maybe_null= FALSE;
table->outer_join= 0;
if (!(straight_join || table->straight))
{
table->dep_tables= 0;
TABLE_LIST *embedding= table->embedding;
while (embedding)
{
if (embedding->nested_join->join_list.head()->outer_join)
{
if (!embedding->sj_subq_pred)
table->dep_tables= embedding->dep_tables;
break;
}
embedding= embedding->embedding;
}
}
if (table->on_expr)
{
/* Add ON expression to the WHERE or upper-level ON condition. */
if (conds)
{
conds= and_conds(join->thd, conds, table->on_expr);
conds->top_level_item();
/* conds is always a new item as both cond and on_expr existed */
DBUG_ASSERT(!conds->is_fixed());
conds->fix_fields(join->thd, &conds);
}
else
conds= table->on_expr;
table->prep_on_expr= table->on_expr= 0;
}
}
/*
Only inner tables of non-convertible outer joins
remain with on_expr.
*/
if (table->on_expr)
{
table_map table_on_expr_used_tables= table->on_expr->used_tables();
table->dep_tables|= table_on_expr_used_tables;
if (table->embedding)
{
table->dep_tables&= ~table->embedding->nested_join->used_tables;
/*
Embedding table depends on tables used
in embedded on expressions.
*/
table->embedding->on_expr_dep_tables|= table_on_expr_used_tables;
}
else
table->dep_tables&= ~table->get_map();
}
if (prev_table)
{
/* The order of tables is reverse: prev_table follows table */
if (prev_table->straight || straight_join)
prev_table->dep_tables|= used_tables;
if (prev_table->on_expr)
{
prev_table->dep_tables|= table->on_expr_dep_tables;
table_map prev_used_tables= prev_table->nested_join ?
prev_table->nested_join->used_tables :
prev_table->get_map();
/*
If on expression contains only references to inner tables
we still make the inner tables dependent on the outer tables.
It would be enough to set dependency only on one outer table
for them. Yet this is really a rare case.
Note:
RAND_TABLE_BIT mask should not be counted as it
prevents update of inner table dependences.
For example it might happen if RAND() function
is used in JOIN ON clause.
*/
if (!((prev_table->on_expr->used_tables() &
~(OUTER_REF_TABLE_BIT | RAND_TABLE_BIT)) &
~prev_used_tables))
prev_table->dep_tables|= used_tables;
}
}
prev_table= table;
}
/*
Flatten nested joins that can be flattened.
no ON expression and not a semi-join => can be flattened.
*/
li.rewind();
while ((table= li++))
{
nested_join= table->nested_join;
if (table->sj_on_expr && !in_sj)
{
/*
If this is a semi-join that is not contained within another semi-join
leave it intact (otherwise it is flattened)
*/
/*
Make sure that any semi-join appear in
the join->select_lex->sj_nests list only once
*/
List_iterator_fast<TABLE_LIST> sj_it(join->select_lex->sj_nests);
TABLE_LIST *sj_nest;
while ((sj_nest= sj_it++))
{
if (table == sj_nest)
break;
}
if (sj_nest)
continue;
join->select_lex->sj_nests.push_back(table, join->thd->mem_root);
/*
Also, walk through semi-join children and mark those that are now
top-level
*/
TABLE_LIST *tbl;
List_iterator<TABLE_LIST> it(nested_join->join_list);
while ((tbl= it++))
{
if (!tbl->on_expr && tbl->table)
tbl->table->maybe_null= FALSE;
}
}
else if (nested_join && !table->on_expr)
{
TABLE_LIST *tbl;
List_iterator<TABLE_LIST> it(nested_join->join_list);
List<TABLE_LIST> repl_list;
while ((tbl= it++))
{
tbl->embedding= table->embedding;
if (!tbl->embedding && !tbl->on_expr && tbl->table)
tbl->table->maybe_null= FALSE;
tbl->join_list= table->join_list;
repl_list.push_back(tbl, join->thd->mem_root);
tbl->dep_tables|= table->dep_tables;
}
li.replace(repl_list);
}
}
DBUG_RETURN(conds);
}

View File

@@ -0,0 +1,26 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef HA_MCS_REWRITES
#define HA_MCS_REWRITES
#include "idb_mysql.h"
COND *simplify_joins_(JOIN *join, List<TABLE_LIST> *join_list, COND *conds, bool top, bool in_sj);
#endif

View File

@@ -21,6 +21,27 @@
void check_walk(const Item* item, void* arg);
void disable_indices_for_CEJ(THD *thd_)
{
TABLE_LIST* global_list;
for (global_list = thd_->lex->query_tables; global_list; global_list = global_list->next_global)
{
// MCOL-652 - doing this with derived tables can cause bad things to happen
if (!global_list->derived)
{
global_list->index_hints= new (thd_->mem_root) List<Index_hint>();
global_list->index_hints->push_front(new (thd_->mem_root)
Index_hint(INDEX_HINT_USE,
INDEX_HINT_MASK_JOIN,
NULL,
0), thd_->mem_root);
}
}
}
void mutate_optimizer_flags(THD *thd_)
{
// MCOL-2178 Disable all optimizer flags as it was in the fork.
@@ -756,6 +777,7 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
return handler;
}
// Remove this in 1.4.3
// Save the original group_list as it can be mutated by the
// optimizer which calls the remove_const() function
Group_list_ptrs *group_list_ptrs = NULL;
@@ -780,9 +802,24 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
// if execution fails.
if (!unsupported_feature)
{
// Most of optimizer_switch flags disabled in external_lock
join->optimization_state= JOIN::OPTIMIZATION_IN_PROGRESS;
join->optimize_inner();
disable_indices_for_CEJ(thd);
if (select_lex->handle_derived(thd->lex, DT_MERGE))
{
// early quit b/c of the error in handle_derived
return handler;
}
COND *conds = simplify_joins_(join, select_lex->join_list, join->conds, TRUE, FALSE);
select_lex->optimize_unflattened_subqueries(false);
if (conds)
{
#ifdef DEBUG_WALK_COND
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
#endif
join->conds = conds;
}
// Impossible HAVING or WHERE
// TODO replace with function call
@@ -818,43 +855,18 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
}
}
if (!unsupported_feature)
{
handler= new ha_columnstore_select_handler(thd, select_lex);
// This is an ugly hack to call simplify_joins()
mcs_handler_info mhi= mcs_handler_info(reinterpret_cast<void*>(handler), SELECT);
// this::table is the place for the result set
int rc= ha_cs_impl_pushdown_init(&mhi, handler->table);
// Return SH if query execution is fine or fallback is disabled
if (!rc || !get_fallback_knob(thd))
return handler;
// Reset the DA and restore optimizer flags
// to allow query to fallback to other handlers
if (thd->get_stmt_da()->is_error())
{
thd->get_stmt_da()->reset_diagnostics_area();
restore_optimizer_flags(thd);
// Return SH even if init fails b/c CS changed SELECT_LEX structures
// with simplify_joins_()
if (rc)
unsupported_feature = true;
}
}
if (join->optimization_state != JOIN::NOT_OPTIMIZED)
{
if (!join->with_two_phase_optimization)
{
if (unsupported_feature && join->have_query_plan != JOIN::QEP_DELETED)
{
join->build_explain();
}
join->optimization_state= JOIN::OPTIMIZATION_DONE;
}
else
{
join->optimization_state= JOIN::OPTIMIZATION_PHASE_1_DONE;
}
return handler;
}
return NULL;

View File

@@ -23,6 +23,8 @@
#include "ha_mcs_sysvars.h"
#define NEED_CALPONT_EXTERNS
#include "ha_mcs_impl.h"
#include "ha_mcs_impl_if.h"
#include "ha_mcs_opt_rewrites.h"
void mutate_optimizer_flags(THD *thd_);
void restore_optimizer_flags(THD *thd_);

View File

@@ -98,15 +98,6 @@ static MYSQL_THDVAR_BOOL(
1
);
static MYSQL_THDVAR_BOOL(
processing_handlers_fallback,
PLUGIN_VAR_NOCMDARG,
"Enable/Disable the unsupported features check in handlers.",
NULL,
NULL,
0
);
static MYSQL_THDVAR_UINT(
orderby_threads,
PLUGIN_VAR_RQCMDARG,
@@ -304,7 +295,6 @@ st_mysql_sys_var* mcs_system_variables[] =
MYSQL_SYSVAR(original_optimizer_flags),
MYSQL_SYSVAR(select_handler),
MYSQL_SYSVAR(derived_handler),
MYSQL_SYSVAR(processing_handlers_fallback),
MYSQL_SYSVAR(group_by_handler),
MYSQL_SYSVAR(orderby_threads),
MYSQL_SYSVAR(decimal_scale),
@@ -391,16 +381,7 @@ void set_group_by_handler(THD* thd, bool value)
THDVAR(thd, group_by_handler) = value;
}
bool get_fallback_knob(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, processing_handlers_fallback);
}
void set_fallback_knob(THD* thd, bool value)
{
THDVAR(thd, processing_handlers_fallback) = value;
}
void set_compression_type(THD* thd, ulong value)
void set_compression_type(THD* thd, ulong value)
{
THDVAR(thd, compression_type) = value;
}

View File

@@ -55,9 +55,6 @@ void set_derived_handler(THD* thd, bool value);
bool get_group_by_handler(THD* thd);
void set_group_by_handler(THD* thd, bool value);
bool get_fallback_knob(THD* thd);
void set_fallback_knob(THD* thd, bool value);
uint get_orderby_threads(THD* thd);
void set_orderby_threads(THD* thd, uint value);

View File

@@ -203,7 +203,6 @@ public:
private:
SELECT_LEX* fFromSub;
std::string fAlias;
bool fPushdownHand;
};
class SelectSubQuery : public SubQuery

View File

@@ -17,31 +17,31 @@ for arg in "$@"; do
done
mysql --force --user=root mysql 2> ${tmpdir}/mysql_install.log <<EOD
INSERT INTO mysql.func VALUES ('calgetstats',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calsettrace',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calsetparms',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calflushcache',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calgettrace',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calgetversion',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calonlinealter',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calviewtablelock',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calcleartablelock',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('callastinsertid',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calgetsqlcount',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbpm',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbdbroot',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbsegment',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbsegmentdir',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbextentrelativerid',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbblockid',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbextentid',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbextentmin',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbextentmax',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idbpartition',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('idblocalpm',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('mcssystemready',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('mcssystemreadonly',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('mcssystemprimary',2,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calgetstats',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calsettrace',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calsetparms',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calflushcache',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calgettrace',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calgetversion',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calonlinealter',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calviewtablelock',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calcleartablelock',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('callastinsertid',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calgetsqlcount',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbpm',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbdbroot',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbsegment',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbsegmentdir',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbextentrelativerid',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbblockid',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbextentid',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbextentmin',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbextentmax',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idbpartition',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('idblocalpm',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('mcssystemready',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('mcssystemreadonly',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('mcssystemprimary',2,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('regr_avgx',1,'libregr_mysql.so','aggregate');
INSERT INTO mysql.func VALUES ('regr_avgy',1,'libregr_mysql.so','aggregate');
INSERT INTO mysql.func VALUES ('regr_count',2,'libregr_mysql.so','aggregate');
@@ -55,14 +55,14 @@ INSERT INTO mysql.func VALUES ('regr_sxy',1,'libregr_mysql.so','aggregate');
INSERT INTO mysql.func VALUES ('covar_pop',1,'libregr_mysql.so','aggregate');
INSERT INTO mysql.func VALUES ('covar_samp',1,'libregr_mysql.so','aggregate');
INSERT INTO mysql.func VALUES ('distinct_count',2,'libudf_mysql.so','aggregate');
INSERT INTO mysql.func VALUES ('caldisablepartitions',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calenablepartitions',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('caldroppartitions',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calshowpartitions',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('caldroppartitionsbyvalue',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('caldisablepartitionsbyvalue',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calenablepartitionsbyvalue',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('calshowpartitionsbyvalue',0,'libcalmysql.so','function');
INSERT INTO mysql.func VALUES ('caldisablepartitions',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calenablepartitions',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('caldroppartitions',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calshowpartitions',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('caldroppartitionsbyvalue',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('caldisablepartitionsbyvalue',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calenablepartitionsbyvalue',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calshowpartitionsbyvalue',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('moda',4,'libregr_mysql.so','aggregate');
CREATE DATABASE IF NOT EXISTS infinidb_querystats;

View File

@@ -0,0 +1,23 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
int is_columnstore_extents_plugin_init(void* p);
int is_columnstore_files_plugin_init(void* p);
int is_columnstore_tables_plugin_init(void* p);
int is_columnstore_columns_plugin_init(void* p);

View File

@@ -28,6 +28,7 @@
#include "calpontsystemcatalog.h"
#include "dataconvert.h"
#include "exceptclasses.h"
#include "is_columnstore.h"
using namespace logging;
@@ -260,7 +261,7 @@ static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond)
return 0;
}
static int is_columnstore_columns_plugin_init(void* p)
int is_columnstore_columns_plugin_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*) p;
schema->fields_info = is_columnstore_columns_fields;
@@ -268,26 +269,3 @@ static int is_columnstore_columns_plugin_init(void* p)
return 0;
}
static struct st_mysql_information_schema is_columnstore_columns_plugin_version =
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
maria_declare_plugin(is_columnstore_columns_plugin)
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_columns_plugin_version,
"COLUMNSTORE_COLUMNS",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore columns",
PLUGIN_LICENSE_GPL,
is_columnstore_columns_plugin_init,
//is_columnstore_tables_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;

View File

@@ -26,7 +26,7 @@
#include "dbrm.h"
#include "objectidmanager.h"
#include "is_columnstore.h"
// Required declaration as it isn't in a MairaDB include
bool schema_table_store_record(THD* thd, TABLE* table);
@@ -278,34 +278,10 @@ static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond)
return 0;
}
static int is_columnstore_extents_plugin_init(void* p)
int is_columnstore_extents_plugin_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*) p;
schema->fields_info = is_columnstore_extents_fields;
schema->fill_table = is_columnstore_extents_fill;
return 0;
}
static struct st_mysql_information_schema is_columnstore_extents_plugin_version =
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
maria_declare_plugin(is_columnstore_extents_plugin)
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_extents_plugin_version,
"COLUMNSTORE_EXTENTS",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore extents",
PLUGIN_LICENSE_GPL,
is_columnstore_extents_plugin_init,
//is_columnstore_extents_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;

View File

@@ -35,6 +35,7 @@
#include "messagequeue.h"
#include "messagequeuepool.h"
#include "we_messages.h"
#include "is_columnstore.h"
// Required declaration as it isn't in a MairaDB include
bool schema_table_store_record(THD* thd, TABLE* table);
@@ -289,7 +290,7 @@ static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond)
return 0;
}
static int is_columnstore_files_plugin_init(void* p)
int is_columnstore_files_plugin_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*) p;
schema->fields_info = is_columnstore_files_fields;
@@ -297,24 +298,3 @@ static int is_columnstore_files_plugin_init(void* p)
return 0;
}
static struct st_mysql_information_schema is_columnstore_files_plugin_version =
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
maria_declare_plugin(is_columnstore_files_plugin)
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_files_plugin_version,
"COLUMNSTORE_FILES",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore filess",
PLUGIN_LICENSE_GPL,
is_columnstore_files_plugin_init,
//is_columnstore_files_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;

View File

@@ -26,7 +26,7 @@
#include <boost/shared_ptr.hpp>
#include "calpontsystemcatalog.h"
#include "dataconvert.h"
#include "is_columnstore.h"
// Required declaration as it isn't in a MairaDB include
bool schema_table_store_record(THD* thd, TABLE* table);
@@ -158,7 +158,7 @@ static int is_columnstore_tables_fill(THD* thd, TABLE_LIST* tables, COND* cond)
return 0;
}
static int is_columnstore_tables_plugin_init(void* p)
int is_columnstore_tables_plugin_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*) p;
schema->fields_info = is_columnstore_tables_fields;
@@ -166,26 +166,3 @@ static int is_columnstore_tables_plugin_init(void* p)
return 0;
}
static struct st_mysql_information_schema is_columnstore_tables_plugin_version =
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
maria_declare_plugin(is_columnstore_tables_plugin)
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&is_columnstore_tables_plugin_version,
"COLUMNSTORE_TABLES",
"MariaDB Corporation",
"An information schema plugin to list ColumnStore tables",
PLUGIN_LICENSE_GPL,
is_columnstore_tables_plugin_init,
//is_columnstore_tables_plugin_deinit,
NULL,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;

View File

@@ -1,102 +0,0 @@
// Microsoft Visual C++ generated resource script.
//
#include "resource.h"
#define APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 2 resource.
//
#include "afxres.h"
/////////////////////////////////////////////////////////////////////////////
#undef APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
// English (U.S.) resources
#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU)
#ifdef _WIN32
LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US
#pragma code_page(1252)
#endif //_WIN32
#ifdef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// TEXTINCLUDE
//
1 TEXTINCLUDE
BEGIN
"resource.h\0"
END
2 TEXTINCLUDE
BEGIN
"#include ""afxres.h""\r\n"
"\0"
END
3 TEXTINCLUDE
BEGIN
"\r\n"
"\0"
END
#endif // APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Version
//
VS_VERSION_INFO VERSIONINFO
FILEVERSION 4,6,0,0
PRODUCTVERSION 4,6,0,0
FILEFLAGSMASK 0x17L
#ifdef _DEBUG
FILEFLAGS 0x1L
#else
FILEFLAGS 0x0L
#endif
FILEOS 0x4L
FILETYPE 0x2L
FILESUBTYPE 0x0L
BEGIN
BLOCK "StringFileInfo"
BEGIN
BLOCK "040904b0"
BEGIN
VALUE "CompanyName", "InfiniDB, Inc."
VALUE "FileDescription", "InfiniDB MySQL Connector API"
VALUE "FileVersion", "4.6.0-0"
VALUE "InternalName", "libcalmysql"
VALUE "LegalCopyright", "Copyright (C) 2014"
VALUE "OriginalFilename", "libcalmysql.dll"
VALUE "ProductName", "InfiniDB"
VALUE "ProductVersion", "4.6.0.0 Beta"
END
END
BLOCK "VarFileInfo"
BEGIN
VALUE "Translation", 0x409, 1200
END
END
#endif // English (U.S.) resources
/////////////////////////////////////////////////////////////////////////////
#ifndef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 3 resource.
//
/////////////////////////////////////////////////////////////////////////////
#endif // not APSTUDIO_INVOKED

View File

@@ -1,14 +0,0 @@
//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ generated include file.
// Used by libcalmysql.rc
// Next default values for new objects
//
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NEXT_RESOURCE_VALUE 101
#define _APS_NEXT_COMMAND_VALUE 40001
#define _APS_NEXT_CONTROL_VALUE 1001
#define _APS_NEXT_SYMED_VALUE 101
#endif
#endif

View File

@@ -400,7 +400,7 @@ tpl_close ( cpsm_tplh_t* ntplh,
// MCOL-1601 Dispose of unused empty RowGroup
if (clear_scan_ctx)
{
std::cout << "tpl_close() clear_scan_ctx read" << std::endl;
SMDEBUGLOG << "tpl_close() clear_scan_ctx read" << std::endl;
bs = hndl->exeMgr->read();
}