You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-1201 Add support for UDAF multiple parm constants
This commit is contained in:
@ -38,6 +38,8 @@ class ByteStream;
|
|||||||
*/
|
*/
|
||||||
namespace execplan
|
namespace execplan
|
||||||
{
|
{
|
||||||
|
class ConstantColumn;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief A class to represent a constant return column
|
* @brief A class to represent a constant return column
|
||||||
*
|
*
|
||||||
|
@ -405,7 +405,7 @@ uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add)
|
|||||||
|
|
||||||
if (add)
|
if (add)
|
||||||
{
|
{
|
||||||
// setTupleInfo first if add is ture, ok if already set.
|
// setTupleInfo first if add is true, ok if already set.
|
||||||
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
|
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(srcp.get());
|
||||||
|
|
||||||
if (sc != NULL)
|
if (sc != NULL)
|
||||||
|
@ -300,6 +300,7 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
|||||||
{
|
{
|
||||||
const ArithmeticColumn* ac = NULL;
|
const ArithmeticColumn* ac = NULL;
|
||||||
const FunctionColumn* fc = NULL;
|
const FunctionColumn* fc = NULL;
|
||||||
|
const ConstantColumn* cc = NULL;
|
||||||
uint64_t eid = -1;
|
uint64_t eid = -1;
|
||||||
CalpontSystemCatalog::ColType ct;
|
CalpontSystemCatalog::ColType ct;
|
||||||
ExpressionStep* es = new ExpressionStep(jobInfo);
|
ExpressionStep* es = new ExpressionStep(jobInfo);
|
||||||
@ -316,6 +317,11 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo)
|
|||||||
eid = fc->expressionId();
|
eid = fc->expressionId();
|
||||||
ct = fc->resultType();
|
ct = fc->resultType();
|
||||||
}
|
}
|
||||||
|
else if ((cc = dynamic_cast<const ConstantColumn*>(retCols[i].get())) != NULL)
|
||||||
|
{
|
||||||
|
eid = cc->expressionId();
|
||||||
|
ct = cc->resultType();
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::ostringstream errmsg;
|
std::ostringstream errmsg;
|
||||||
@ -1004,7 +1010,9 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
|||||||
|
|
||||||
for (uint32_t parm = 0; parm < aggParms.size(); ++parm)
|
for (uint32_t parm = 0; parm < aggParms.size(); ++parm)
|
||||||
{
|
{
|
||||||
if (aggc->constCol().get() != NULL)
|
// Only do the optimization of converting to count(*) if
|
||||||
|
// there is only one parameter.
|
||||||
|
if (aggParms.size() == 1 && aggc->constCol().get() != NULL)
|
||||||
{
|
{
|
||||||
// replace the aggregate on constant with a count(*)
|
// replace the aggregate on constant with a count(*)
|
||||||
SRCP clone;
|
SRCP clone;
|
||||||
|
@ -569,6 +569,7 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
|
|||||||
|
|
||||||
for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
|
for (RetColsVector::iterator i = jobInfo.windowCols.begin(); i < jobInfo.windowCols.end(); i++)
|
||||||
{
|
{
|
||||||
|
bool isUDAF = false;
|
||||||
// window function type
|
// window function type
|
||||||
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
|
WindowFunctionColumn* wc = dynamic_cast<WindowFunctionColumn*>(i->get());
|
||||||
uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index
|
uint64_t ridx = getColumnIndex(*i, colIndexMap, jobInfo); // result index
|
||||||
@ -590,6 +591,7 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
|
|||||||
// if (boost::iequals(wc->functionName(),"UDAF_FUNC")
|
// if (boost::iequals(wc->functionName(),"UDAF_FUNC")
|
||||||
if (wc->functionName() == "UDAF_FUNC")
|
if (wc->functionName() == "UDAF_FUNC")
|
||||||
{
|
{
|
||||||
|
isUDAF = true;
|
||||||
++wfsUserFunctionCount;
|
++wfsUserFunctionCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -646,10 +648,13 @@ void WindowFunctionStep::initialize(const RowGroup& rg, JobInfo& jobInfo)
|
|||||||
// column type for functor templates
|
// column type for functor templates
|
||||||
int ct = 0;
|
int ct = 0;
|
||||||
|
|
||||||
|
if (isUDAF)
|
||||||
|
{
|
||||||
|
ct = wc->getUDAFContext().getResultType();
|
||||||
|
}
|
||||||
// make sure index is in range
|
// make sure index is in range
|
||||||
if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
|
else if (fields.size() > 1 && fields[1] >= 0 && static_cast<uint64_t>(fields[1]) < types.size())
|
||||||
ct = types[fields[1]];
|
ct = types[fields[1]];
|
||||||
|
|
||||||
// workaround for functions using "within group (order by)" syntax
|
// workaround for functions using "within group (order by)" syntax
|
||||||
string fn = boost::to_upper_copy(wc->functionName());
|
string fn = boost::to_upper_copy(wc->functionName());
|
||||||
|
|
||||||
|
@ -1677,15 +1677,11 @@ void BatchPrimitiveProcessor::execute()
|
|||||||
}
|
}
|
||||||
catch (logging::QueryDataExcept& qex)
|
catch (logging::QueryDataExcept& qex)
|
||||||
{
|
{
|
||||||
ostringstream os;
|
writeErrorMsg(qex.what(), qex.errorCode());
|
||||||
os << qex.what() << endl;
|
|
||||||
writeErrorMsg(os.str(), qex.errorCode());
|
|
||||||
}
|
}
|
||||||
catch (logging::DictionaryBufferOverflow& db)
|
catch (logging::DictionaryBufferOverflow& db)
|
||||||
{
|
{
|
||||||
ostringstream os;
|
writeErrorMsg(db.what(), db.errorCode());
|
||||||
os << db.what() << endl;
|
|
||||||
writeErrorMsg(os.str(), db.errorCode());
|
|
||||||
}
|
}
|
||||||
catch (scalar_exception& se)
|
catch (scalar_exception& se)
|
||||||
{
|
{
|
||||||
@ -1758,15 +1754,11 @@ void BatchPrimitiveProcessor::execute()
|
|||||||
}
|
}
|
||||||
catch (IDBExcept& iex)
|
catch (IDBExcept& iex)
|
||||||
{
|
{
|
||||||
ostringstream os;
|
writeErrorMsg(iex.what(), iex.errorCode(), true, false);
|
||||||
os << iex.what() << endl;
|
|
||||||
writeErrorMsg(os.str(), iex.errorCode(), true, false);
|
|
||||||
}
|
}
|
||||||
catch (const std::exception& ex)
|
catch (const std::exception& ex)
|
||||||
{
|
{
|
||||||
ostringstream os;
|
writeErrorMsg(ex.what(), logging::batchPrimitiveProcessorErr);
|
||||||
os << ex.what() << endl;
|
|
||||||
writeErrorMsg(os.str(), logging::batchPrimitiveProcessorErr);
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -11,15 +11,12 @@
|
|||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
namespace static_any
|
namespace static_any
|
||||||
{
|
{
|
||||||
namespace anyimpl
|
namespace anyimpl
|
||||||
{
|
{
|
||||||
struct bad_any_cast
|
|
||||||
{
|
|
||||||
};
|
|
||||||
|
|
||||||
struct empty_any
|
struct empty_any
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
@ -266,7 +263,7 @@ public:
|
|||||||
T& cast()
|
T& cast()
|
||||||
{
|
{
|
||||||
if (policy != anyimpl::get_policy<T>())
|
if (policy != anyimpl::get_policy<T>())
|
||||||
throw anyimpl::bad_any_cast();
|
throw std::runtime_error("static_any: type mismatch in cast");
|
||||||
T* r = reinterpret_cast<T*>(policy->get_value(&object));
|
T* r = reinterpret_cast<T*>(policy->get_value(&object));
|
||||||
return *r;
|
return *r;
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ using namespace std;
|
|||||||
namespace logging
|
namespace logging
|
||||||
{
|
{
|
||||||
|
|
||||||
ErrorCodes::ErrorCodes(): fErrorCodes(), fPreamble("An unexpected condition within the query caused an internal processing error within InfiniDB. Please check the log files for more details. Additional Information: ")
|
ErrorCodes::ErrorCodes(): fErrorCodes(), fPreamble("An unexpected condition within the query caused an internal processing error within Columnstore. Please check the log files for more details. Additional Information: ")
|
||||||
{
|
{
|
||||||
fErrorCodes[batchPrimitiveStepErr] = "error in BatchPrimitiveStep.";
|
fErrorCodes[batchPrimitiveStepErr] = "error in BatchPrimitiveStep.";
|
||||||
fErrorCodes[tupleBPSErr] = "error in TupleBPS.";
|
fErrorCodes[tupleBPSErr] = "error in TupleBPS.";
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
|
|
||||||
#include "exceptclasses.h"
|
#include "exceptclasses.h"
|
||||||
#include "serializeable.h"
|
#include "serializeable.h"
|
||||||
|
#include "any.hpp"
|
||||||
|
|
||||||
class ByteStreamTestSuite;
|
class ByteStreamTestSuite;
|
||||||
|
|
||||||
|
@ -50,6 +50,7 @@
|
|||||||
#include "stlpoolallocator.h"
|
#include "stlpoolallocator.h"
|
||||||
#include "returnedcolumn.h"
|
#include "returnedcolumn.h"
|
||||||
#include "mcsv1_udaf.h"
|
#include "mcsv1_udaf.h"
|
||||||
|
#include "constantcolumn.h"
|
||||||
|
|
||||||
// To do: move code that depends on joblist to a proper subsystem.
|
// To do: move code that depends on joblist to a proper subsystem.
|
||||||
namespace joblist
|
namespace joblist
|
||||||
@ -200,6 +201,13 @@ struct RowAggFunctionCol
|
|||||||
// 4. for duplicate - point to the real aggretate column to be copied from
|
// 4. for duplicate - point to the real aggretate column to be copied from
|
||||||
// Set only on UM, the fAuxColumnIndex is defaulted to fOutputColumnIndex+1 on PM.
|
// Set only on UM, the fAuxColumnIndex is defaulted to fOutputColumnIndex+1 on PM.
|
||||||
uint32_t fAuxColumnIndex;
|
uint32_t fAuxColumnIndex;
|
||||||
|
|
||||||
|
// For UDAF that have more than one parameter and some parameters are constant.
|
||||||
|
// There will be a series of RowAggFunctionCol created, one for each parameter.
|
||||||
|
// The first will be a RowUDAFFunctionCol. Subsequent ones will be RowAggFunctionCol
|
||||||
|
// with fAggFunction == ROWAGG_MULTI_PARM. Order is important.
|
||||||
|
// If this parameter is constant, that value is here.
|
||||||
|
SRCP fpConstCol;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -220,8 +228,11 @@ struct RowUDAFFunctionCol : public RowAggFunctionCol
|
|||||||
inputColIndex, outputColIndex, auxColIndex),
|
inputColIndex, outputColIndex, auxColIndex),
|
||||||
bInterrupted(false)
|
bInterrupted(false)
|
||||||
{}
|
{}
|
||||||
RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) : RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE,
|
RowUDAFFunctionCol(const RowUDAFFunctionCol& rhs) :
|
||||||
rhs.fInputColumnIndex, rhs.fOutputColumnIndex, rhs.fAuxColumnIndex), fUDAFContext(rhs.fUDAFContext)
|
RowAggFunctionCol(ROWAGG_UDAF, ROWAGG_FUNCT_UNDEFINE, rhs.fInputColumnIndex,
|
||||||
|
rhs.fOutputColumnIndex, rhs.fAuxColumnIndex),
|
||||||
|
fUDAFContext(rhs.fUDAFContext),
|
||||||
|
bInterrupted(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
virtual ~RowUDAFFunctionCol() {}
|
virtual ~RowUDAFFunctionCol() {}
|
||||||
@ -238,6 +249,16 @@ inline void RowAggFunctionCol::serialize(messageqcpp::ByteStream& bs) const
|
|||||||
bs << (uint8_t)fAggFunction;
|
bs << (uint8_t)fAggFunction;
|
||||||
bs << fInputColumnIndex;
|
bs << fInputColumnIndex;
|
||||||
bs << fOutputColumnIndex;
|
bs << fOutputColumnIndex;
|
||||||
|
if (fpConstCol)
|
||||||
|
{
|
||||||
|
bs << (uint8_t)1;
|
||||||
|
fpConstCol.get()->serialize(bs);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
bs << (uint8_t)0;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
|
inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
|
||||||
@ -245,6 +266,13 @@ inline void RowAggFunctionCol::deserialize(messageqcpp::ByteStream& bs)
|
|||||||
bs >> (uint8_t&)fAggFunction;
|
bs >> (uint8_t&)fAggFunction;
|
||||||
bs >> fInputColumnIndex;
|
bs >> fInputColumnIndex;
|
||||||
bs >> fOutputColumnIndex;
|
bs >> fOutputColumnIndex;
|
||||||
|
uint8_t t;
|
||||||
|
bs >> t;
|
||||||
|
if (t)
|
||||||
|
{
|
||||||
|
fpConstCol.reset(new ConstantColumn);
|
||||||
|
fpConstCol.get()->unserialize(bs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void RowUDAFFunctionCol::serialize(messageqcpp::ByteStream& bs) const
|
inline void RowUDAFFunctionCol::serialize(messageqcpp::ByteStream& bs) const
|
||||||
@ -586,7 +614,7 @@ protected:
|
|||||||
virtual void doAvg(const Row&, int64_t, int64_t, int64_t);
|
virtual void doAvg(const Row&, int64_t, int64_t, int64_t);
|
||||||
virtual void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
virtual void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
||||||
virtual void doBitOp(const Row&, int64_t, int64_t, int);
|
virtual void doBitOp(const Row&, int64_t, int64_t, int);
|
||||||
virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx);
|
virtual void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx);
|
||||||
virtual bool countSpecial(const RowGroup* pRG)
|
virtual bool countSpecial(const RowGroup* pRG)
|
||||||
{
|
{
|
||||||
fRow.setIntField<8>(fRow.getIntField<8>(0) + pRG->getRowCount(), 0);
|
fRow.setIntField<8>(fRow.getIntField<8>(0) + pRG->getRowCount(), 0);
|
||||||
@ -902,7 +930,7 @@ protected:
|
|||||||
void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
void doStatistics(const Row&, int64_t, int64_t, int64_t);
|
||||||
void doGroupConcat(const Row&, int64_t, int64_t);
|
void doGroupConcat(const Row&, int64_t, int64_t);
|
||||||
void doBitOp(const Row&, int64_t, int64_t, int);
|
void doBitOp(const Row&, int64_t, int64_t, int);
|
||||||
void doUDAF(const Row&, int64_t, int64_t, int64_t, RowUDAFFunctionCol* rowUDAF, uint64_t& funcColsIdx);
|
void doUDAF(const Row&, int64_t, int64_t, int64_t, uint64_t& funcColsIdx);
|
||||||
bool countSpecial(const RowGroup* pRG)
|
bool countSpecial(const RowGroup* pRG)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
@ -48,7 +48,6 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -56,7 +56,6 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -35,7 +35,6 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -68,7 +68,6 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -56,7 +56,6 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -82,7 +82,7 @@ mcsv1_UDAF::ReturnCode regr_avgx::nextValue(mcsv1Context* context, ColumnDatum*
|
|||||||
{
|
{
|
||||||
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
|
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
|
||||||
}
|
}
|
||||||
if (valIn_x.empty() || valIn_y.empty())
|
if (valIn_x.empty() || valIn_y.empty()) // Usually empty if NULL. Probably redundant
|
||||||
{
|
{
|
||||||
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
|
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
|
||||||
}
|
}
|
||||||
@ -107,10 +107,6 @@ mcsv1_UDAF::ReturnCode regr_avgx::nextValue(mcsv1Context* context, ColumnDatum*
|
|||||||
{
|
{
|
||||||
val = valIn_x.cast<int>();
|
val = valIn_x.cast<int>();
|
||||||
}
|
}
|
||||||
else if (valIn_x.compatible(longTypeId))
|
|
||||||
{
|
|
||||||
val = valIn_x.cast<long>();
|
|
||||||
}
|
|
||||||
else if (valIn_x.compatible(llTypeId))
|
else if (valIn_x.compatible(llTypeId))
|
||||||
{
|
{
|
||||||
val = valIn_x.cast<long long>();
|
val = valIn_x.cast<long long>();
|
||||||
|
@ -35,7 +35,6 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -56,7 +56,6 @@
|
|||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <boost/any.hpp>
|
|
||||||
#ifdef _MSC_VER
|
#ifdef _MSC_VER
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#else
|
#else
|
||||||
|
@ -238,38 +238,5 @@
|
|||||||
N="Makefile"
|
N="Makefile"
|
||||||
Type="Makefile"/>
|
Type="Makefile"/>
|
||||||
</Folder>
|
</Folder>
|
||||||
<Folder
|
|
||||||
Name="doc"
|
|
||||||
Filters="*.rst">
|
|
||||||
<Folder
|
|
||||||
Name="reference"
|
|
||||||
Filters="">
|
|
||||||
<F N="docs/source/reference/api.rst"/>
|
|
||||||
<F N="docs/source/reference/ByteStream.rst"/>
|
|
||||||
<F N="docs/source/reference/ColumnDatum.rst"/>
|
|
||||||
<F N="docs/source/reference/index.rst"/>
|
|
||||||
<F N="docs/source/reference/MariaDBUDAF.rst"/>
|
|
||||||
<F N="docs/source/reference/mcsv1_UDAF.rst"/>
|
|
||||||
<F N="docs/source/reference/mcsv1Context.rst"/>
|
|
||||||
<F N="docs/source/reference/UDAFMap.rst"/>
|
|
||||||
<F N="docs/source/reference/UserData.rst"/>
|
|
||||||
</Folder>
|
|
||||||
<Folder
|
|
||||||
Name="usage"
|
|
||||||
Filters="">
|
|
||||||
<F N="docs/source/usage/cmakelists.rst"/>
|
|
||||||
<F N="docs/source/usage/compile.rst"/>
|
|
||||||
<F N="docs/source/usage/copylibs.rst"/>
|
|
||||||
<F N="docs/source/usage/headerfile.rst"/>
|
|
||||||
<F N="docs/source/usage/index.rst"/>
|
|
||||||
<F N="docs/source/usage/introduction.rst"/>
|
|
||||||
<F N="docs/source/usage/memoryallocation.rst"/>
|
|
||||||
<F N="docs/source/usage/sourcefile.rst"/>
|
|
||||||
</Folder>
|
|
||||||
<F N="docs/source/changelog.rst"/>
|
|
||||||
<F N="docs/source/conf.py"/>
|
|
||||||
<F N="docs/source/index.rst"/>
|
|
||||||
<F N="docs/source/license.rst"/>
|
|
||||||
</Folder>
|
|
||||||
</Files>
|
</Files>
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -451,7 +451,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
{
|
{
|
||||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||||
uint64_t colOut = fFieldIndex[0];
|
uint64_t colOut = fFieldIndex[0];
|
||||||
|
bool isNull = false;
|
||||||
if ((fFrameUnit == WF__FRAME_ROWS) ||
|
if ((fFrameUnit == WF__FRAME_ROWS) ||
|
||||||
(fPrev == -1) ||
|
(fPrev == -1) ||
|
||||||
(!fPeer->operator()(getPointer(fRowData->at(c)), getPointer(fRowData->at(fPrev)))))
|
(!fPeer->operator()(getPointer(fRowData->at(c)), getPointer(fRowData->at(fPrev)))))
|
||||||
@ -468,14 +468,25 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
|
|
||||||
// Put the parameter metadata (type, scale, precision) into valsIn
|
// Put the parameter metadata (type, scale, precision) into valsIn
|
||||||
mcsv1sdk::ColumnDatum valsIn[getContext().getParameterCount()];
|
mcsv1sdk::ColumnDatum valsIn[getContext().getParameterCount()];
|
||||||
|
ConstantColumn* cc = NULL;
|
||||||
for (uint32_t i = 0; i < getContext().getParameterCount(); ++i)
|
for (uint32_t i = 0; i < getContext().getParameterCount(); ++i)
|
||||||
{
|
{
|
||||||
uint64_t colIn = fFieldIndex[i+1];
|
|
||||||
mcsv1sdk::ColumnDatum& datum = valsIn[i];
|
mcsv1sdk::ColumnDatum& datum = valsIn[i];
|
||||||
|
cc = static_cast<ConstantColumn*>(fConstantParms[i].get());
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
datum.dataType = cc->resultType().colDataType;
|
||||||
|
datum.scale = cc->resultType().scale;
|
||||||
|
datum.precision = cc->resultType().precision;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
uint64_t colIn = fFieldIndex[i+1];
|
||||||
datum.dataType = fRow.getColType(colIn);
|
datum.dataType = fRow.getColType(colIn);
|
||||||
datum.scale = fRow.getScale(colIn);
|
datum.scale = fRow.getScale(colIn);
|
||||||
datum.precision = fRow.getPrecision(colIn);
|
datum.precision = fRow.getPrecision(colIn);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (b <= c && c <= e)
|
if (b <= c && c <= e)
|
||||||
getContext().setContextFlag(mcsv1sdk::CONTEXT_HAS_CURRENT_ROW);
|
getContext().setContextFlag(mcsv1sdk::CONTEXT_HAS_CURRENT_ROW);
|
||||||
@ -494,12 +505,14 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
uint32_t flags[getContext().getParameterCount()];
|
uint32_t flags[getContext().getParameterCount()];
|
||||||
for (uint32_t k = 0; k < getContext().getParameterCount(); ++k)
|
for (uint32_t k = 0; k < getContext().getParameterCount(); ++k)
|
||||||
{
|
{
|
||||||
|
cc = static_cast<ConstantColumn*>(fConstantParms[k].get());
|
||||||
uint64_t colIn = fFieldIndex[k+1];
|
uint64_t colIn = fFieldIndex[k+1];
|
||||||
mcsv1sdk::ColumnDatum& datum = valsIn[k];
|
mcsv1sdk::ColumnDatum& datum = valsIn[k];
|
||||||
|
|
||||||
// Turn on Null flags or skip based on respect nulls
|
// Turn on Null flags or skip based on respect nulls
|
||||||
flags[k] = 0;
|
flags[k] = 0;
|
||||||
if (fRow.isNullValue(colIn) == true)
|
if ((!cc && fRow.isNullValue(colIn) == true)
|
||||||
|
|| (cc && cc->type() == ConstantColumn::NULLDATA))
|
||||||
{
|
{
|
||||||
if (!bRespectNulls)
|
if (!bRespectNulls)
|
||||||
{
|
{
|
||||||
@ -510,7 +523,8 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
flags[k] |= mcsv1sdk::PARAM_IS_NULL;
|
flags[k] |= mcsv1sdk::PARAM_IS_NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// MCOL-1201 Multi-Paramter calls
|
if (!bHasNull && !(flags[k] & mcsv1sdk::PARAM_IS_NULL))
|
||||||
|
{
|
||||||
switch (datum.dataType)
|
switch (datum.dataType)
|
||||||
{
|
{
|
||||||
case CalpontSystemCatalog::TINYINT:
|
case CalpontSystemCatalog::TINYINT:
|
||||||
@ -518,10 +532,44 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
case CalpontSystemCatalog::MEDINT:
|
case CalpontSystemCatalog::MEDINT:
|
||||||
case CalpontSystemCatalog::INT:
|
case CalpontSystemCatalog::INT:
|
||||||
case CalpontSystemCatalog::BIGINT:
|
case CalpontSystemCatalog::BIGINT:
|
||||||
case CalpontSystemCatalog::DECIMAL:
|
|
||||||
{
|
{
|
||||||
int64_t valIn;
|
int64_t valIn;
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
valIn = cc->getIntVal(fRow, isNull);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
getValue(colIn, valIn);
|
getValue(colIn, valIn);
|
||||||
|
}
|
||||||
|
// Check for distinct, if turned on.
|
||||||
|
// Currently, distinct only works on the first parameter.
|
||||||
|
if (k == 0)
|
||||||
|
{
|
||||||
|
if ((fDistinct) || (fDistinctSet.find(valIn) != fDistinctSet.end()))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fDistinct)
|
||||||
|
fDistinctSet.insert(valIn);
|
||||||
|
}
|
||||||
|
datum.columnData = valIn;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case CalpontSystemCatalog::DECIMAL:
|
||||||
|
case CalpontSystemCatalog::UDECIMAL:
|
||||||
|
{
|
||||||
|
int64_t valIn;
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
valIn = cc->getDecimalVal(fRow, isNull).value;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
getValue(colIn, valIn);
|
||||||
|
}
|
||||||
// Check for distinct, if turned on.
|
// Check for distinct, if turned on.
|
||||||
// Currently, distinct only works on the first parameter.
|
// Currently, distinct only works on the first parameter.
|
||||||
if (k == 0)
|
if (k == 0)
|
||||||
@ -543,10 +591,16 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
case CalpontSystemCatalog::UMEDINT:
|
case CalpontSystemCatalog::UMEDINT:
|
||||||
case CalpontSystemCatalog::UINT:
|
case CalpontSystemCatalog::UINT:
|
||||||
case CalpontSystemCatalog::UBIGINT:
|
case CalpontSystemCatalog::UBIGINT:
|
||||||
case CalpontSystemCatalog::UDECIMAL:
|
|
||||||
{
|
{
|
||||||
uint64_t valIn;
|
uint64_t valIn;
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
valIn = cc->getUintVal(fRow, isNull);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
getValue(colIn, valIn);
|
getValue(colIn, valIn);
|
||||||
|
}
|
||||||
// Check for distinct, if turned on.
|
// Check for distinct, if turned on.
|
||||||
// Currently, distinct only works on the first parameter.
|
// Currently, distinct only works on the first parameter.
|
||||||
if (k == 0)
|
if (k == 0)
|
||||||
@ -567,7 +621,14 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
case CalpontSystemCatalog::UDOUBLE:
|
case CalpontSystemCatalog::UDOUBLE:
|
||||||
{
|
{
|
||||||
double valIn;
|
double valIn;
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
valIn = cc->getDoubleVal(fRow, isNull);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
getValue(colIn, valIn);
|
getValue(colIn, valIn);
|
||||||
|
}
|
||||||
// Check for distinct, if turned on.
|
// Check for distinct, if turned on.
|
||||||
// Currently, distinct only works on the first parameter.
|
// Currently, distinct only works on the first parameter.
|
||||||
if (k == 0)
|
if (k == 0)
|
||||||
@ -588,7 +649,14 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
case CalpontSystemCatalog::UFLOAT:
|
case CalpontSystemCatalog::UFLOAT:
|
||||||
{
|
{
|
||||||
float valIn;
|
float valIn;
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
valIn = cc->getFloatVal(fRow, isNull);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
getValue(colIn, valIn);
|
getValue(colIn, valIn);
|
||||||
|
}
|
||||||
// Check for distinct, if turned on.
|
// Check for distinct, if turned on.
|
||||||
// Currently, distinct only works on the first parameter.
|
// Currently, distinct only works on the first parameter.
|
||||||
if (k == 0)
|
if (k == 0)
|
||||||
@ -612,7 +680,14 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
case CalpontSystemCatalog::BLOB:
|
case CalpontSystemCatalog::BLOB:
|
||||||
{
|
{
|
||||||
string valIn;
|
string valIn;
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
valIn = cc->getStrVal(fRow, isNull);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
getValue(colIn, valIn);
|
getValue(colIn, valIn);
|
||||||
|
}
|
||||||
// Check for distinct, if turned on.
|
// Check for distinct, if turned on.
|
||||||
// Currently, distinct only works on the first parameter.
|
// Currently, distinct only works on the first parameter.
|
||||||
if (k == 0)
|
if (k == 0)
|
||||||
@ -639,6 +714,7 @@ void WF_udaf<T>::operator()(int64_t b, int64_t e, int64_t c)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Skip if any value is NULL and respect nulls is off.
|
// Skip if any value is NULL and respect nulls is off.
|
||||||
if (bHasNull)
|
if (bHasNull)
|
||||||
{
|
{
|
||||||
|
@ -53,8 +53,6 @@ public:
|
|||||||
|
|
||||||
// A class to control the execution of User Define Analytic Functions (UDAnF)
|
// A class to control the execution of User Define Analytic Functions (UDAnF)
|
||||||
// as defined by a specialization of mcsv1sdk::mcsv1_UDAF
|
// as defined by a specialization of mcsv1sdk::mcsv1_UDAF
|
||||||
// The template parameter is currently only used to support DISTINCT, as
|
|
||||||
// as that is done via a set<T>
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
class WF_udaf : public WindowFunctionType
|
class WF_udaf : public WindowFunctionType
|
||||||
{
|
{
|
||||||
|
@ -39,7 +39,6 @@ using namespace logging;
|
|||||||
using namespace ordering;
|
using namespace ordering;
|
||||||
|
|
||||||
#include "calpontsystemcatalog.h"
|
#include "calpontsystemcatalog.h"
|
||||||
#include "constantcolumn.h"
|
|
||||||
#include "dataconvert.h" // int64_t IDB_pow[19]
|
#include "dataconvert.h" // int64_t IDB_pow[19]
|
||||||
using namespace execplan;
|
using namespace execplan;
|
||||||
|
|
||||||
@ -228,6 +227,9 @@ WindowFunctionType::makeWindowFunction(const string& name, int ct, WindowFunctio
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Copy the only the constant parameter pointers
|
||||||
|
af->constParms(wc->functionParms());
|
||||||
|
|
||||||
return af;
|
return af;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -634,6 +636,26 @@ void* WindowFunctionType::getNullValueByType(int ct, int pos)
|
|||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WindowFunctionType::constParms(const std::vector<SRCP>& functionParms)
|
||||||
|
{
|
||||||
|
// fConstantParms will end up with a copy of functionParms, but only
|
||||||
|
// the constant types will be copied. Other types will take up space but
|
||||||
|
// be NULL. This allows us to acces the constants without the overhead
|
||||||
|
// of dynamic_cast for every row.
|
||||||
|
for (size_t i = 0; i < functionParms.size(); ++i)
|
||||||
|
{
|
||||||
|
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(functionParms[i].get());
|
||||||
|
if (cc)
|
||||||
|
{
|
||||||
|
fConstantParms.push_back(functionParms[i]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fConstantParms.push_back(SRCP(cc));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} //namespace
|
} //namespace
|
||||||
// vim:ts=4 sw=4:
|
// vim:ts=4 sw=4:
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@
|
|||||||
#include "returnedcolumn.h"
|
#include "returnedcolumn.h"
|
||||||
#include "rowgroup.h"
|
#include "rowgroup.h"
|
||||||
#include "windowframe.h"
|
#include "windowframe.h"
|
||||||
|
#include "constantcolumn.h"
|
||||||
|
|
||||||
namespace ordering
|
namespace ordering
|
||||||
{
|
{
|
||||||
@ -198,6 +198,8 @@ public:
|
|||||||
fStep = step;
|
fStep = step;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void constParms(const std::vector<SRCP>& functionParms);
|
||||||
|
|
||||||
static boost::shared_ptr<WindowFunctionType> makeWindowFunction(const std::string&, int ct, WindowFunctionColumn* wc);
|
static boost::shared_ptr<WindowFunctionType> makeWindowFunction(const std::string&, int ct, WindowFunctionColumn* wc);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -244,6 +246,9 @@ protected:
|
|||||||
// output and input field indices: [0] - output
|
// output and input field indices: [0] - output
|
||||||
std::vector<int64_t> fFieldIndex;
|
std::vector<int64_t> fFieldIndex;
|
||||||
|
|
||||||
|
// constant function parameters -- needed for udaf with constant
|
||||||
|
std::vector<SRCP> fConstantParms;
|
||||||
|
|
||||||
// row meta data
|
// row meta data
|
||||||
rowgroup::RowGroup fRowGroup;
|
rowgroup::RowGroup fRowGroup;
|
||||||
rowgroup::Row fRow;
|
rowgroup::Row fRow;
|
||||||
|
Reference in New Issue
Block a user