You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-4809 The patch replaces legacy scanning/filtering code with a number of templates that
simplifies control flow removing needless expressions
This commit is contained in:
@ -6,8 +6,6 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ../blockcache ../primproc)
|
||||
|
||||
set(processor_STAT_SRCS primitiveprocessor.cpp dictionary.cpp column.cpp)
|
||||
|
||||
#libprocessor_a_CXXFLAGS = $(march_flags) $(AM_CXXFLAGS)
|
||||
|
||||
add_library(processor STATIC ${processor_STAT_SRCS})
|
||||
|
||||
add_dependencies(processor loggingcpp)
|
||||
@ -15,10 +13,3 @@ add_dependencies(processor loggingcpp)
|
||||
target_link_libraries(processor ${NETSNMP_LIBRARIES})
|
||||
|
||||
INSTALL (TARGETS processor DESTINATION ${ENGINE_LIBDIR})
|
||||
|
||||
#if (WITH_PP_SCAN_UT)
|
||||
# add_executable(pp_scan_unittest pp-scan-unittest.cpp)
|
||||
# target_link_libraries(pp_scan_unittest ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
|
||||
# install(TARGETS pp_scan_unittest DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)
|
||||
#endif()
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -3238,7 +3238,7 @@ public:
|
||||
args->COP = COMPARE_LT;
|
||||
memcpy(args->val, &tmp, in->DataSize);
|
||||
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader) +
|
||||
sizeof(ColArgs) + in->DataSize]);
|
||||
sizeof(ColArgs) + in->DataSize]);
|
||||
args->COP = COMPARE_GT;
|
||||
tmp = 1000;
|
||||
memcpy(args->val, &tmp, in->DataSize);
|
||||
|
@ -58,10 +58,17 @@ void PrimitiveProcessor::setParsedColumnFilter(boost::shared_ptr<ParsedColumnFil
|
||||
parsedColumnFilter = pcf;
|
||||
}
|
||||
|
||||
ParsedColumnFilter::ParsedColumnFilter() : columnFilterMode(STANDARD)
|
||||
ParsedColumnFilter::ParsedColumnFilter() : columnFilterMode(ALWAYS_TRUE), mFilterCount(0)
|
||||
{
|
||||
}
|
||||
|
||||
ParsedColumnFilter::ParsedColumnFilter(const uint32_t aFilterCount)
|
||||
: columnFilterMode(ALWAYS_TRUE), mFilterCount(aFilterCount)
|
||||
{
|
||||
prestored_rfs.reset(new uint8_t[mFilterCount]);
|
||||
prestored_cops.reset(new uint8_t[mFilterCount]);
|
||||
}
|
||||
|
||||
ParsedColumnFilter::~ParsedColumnFilter()
|
||||
{
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||
Copyright (C) 2016-2021 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
@ -59,11 +60,18 @@ namespace primitives
|
||||
|
||||
enum ColumnFilterMode
|
||||
{
|
||||
STANDARD,
|
||||
TWO_ARRAYS,
|
||||
UNORDERED_SET
|
||||
ALWAYS_TRUE, // empty filter is always true
|
||||
SINGLE_COMPARISON, // exactly one comparison operation
|
||||
ANY_COMPARISON_TRUE, // ANY comparison is true (BOP_OR)
|
||||
ALL_COMPARISONS_TRUE, // ALL comparisons are true (BOP_AND)
|
||||
XOR_COMPARISONS, // XORing results of comparisons (BOP_XOR)
|
||||
ONE_OF_VALUES_IN_SET, // ONE of the values in the set is equal to the value checked (BOP_OR + all COMPARE_EQ)
|
||||
NONE_OF_VALUES_IN_SET, // NONE of the values in the set is equal to the value checked (BOP_AND + all COMPARE_NE)
|
||||
ONE_OF_VALUES_IN_ARRAY, // ONE of the values in the small set represented by an array (BOP_OR + all COMPARE_EQ)
|
||||
NONE_OF_VALUES_IN_ARRAY,// NONE of the values in the small set represented by an array (BOP_AND + all COMPARE_NE)
|
||||
};
|
||||
|
||||
// TBD Test if avalance makes lookup in the hash maps based on this hashers faster.
|
||||
class pcfHasher
|
||||
{
|
||||
public:
|
||||
@ -100,6 +108,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// TBD Test robinhood as tr1 set replacement.
|
||||
typedef std::tr1::unordered_set<int64_t, pcfHasher, pcfEqual> prestored_set_t;
|
||||
typedef std::tr1::unordered_set<int128_t, pcfHasher128, pcfEqual128> prestored_set_t_128;
|
||||
|
||||
@ -124,9 +133,39 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct ParsedColumnFilter
|
||||
// Not the safest way b/c it doesn't cover uint128_t but the type
|
||||
// isn't used now and the approach saves some space.
|
||||
template <typename T, typename D = void>
|
||||
struct IntegralTypeToFilterType
|
||||
{
|
||||
using type = int64_t;
|
||||
};
|
||||
|
||||
template<>
|
||||
struct IntegralTypeToFilterType<int128_t>
|
||||
{
|
||||
using type = int128_t;
|
||||
};
|
||||
|
||||
template <typename T, typename D = void>
|
||||
struct IntegralTypeToFilterSetType
|
||||
{
|
||||
using type = prestored_set_t;
|
||||
};
|
||||
|
||||
template<>
|
||||
struct IntegralTypeToFilterSetType<int128_t>
|
||||
{
|
||||
using type = prestored_set_t_128;
|
||||
};
|
||||
|
||||
// DRRTuy shared_arrays and shared_ptr looks redundant here
|
||||
// given that the higher level code uses shared_ptr<ParsedColumnFilter>
|
||||
// thus runtime calls ParsedColumnFilter dtor in the end.
|
||||
class ParsedColumnFilter
|
||||
{
|
||||
public:
|
||||
static constexpr uint32_t noSetFilterThreshold = 8;
|
||||
ColumnFilterMode columnFilterMode;
|
||||
boost::shared_array<int64_t> prestored_argVals;
|
||||
boost::shared_array<int128_t> prestored_argVals128;
|
||||
@ -136,7 +175,92 @@ struct ParsedColumnFilter
|
||||
boost::shared_ptr<prestored_set_t_128> prestored_set_128;
|
||||
|
||||
ParsedColumnFilter();
|
||||
ParsedColumnFilter(const uint32_t aFilterCount);
|
||||
~ParsedColumnFilter();
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<std::is_same<T, int64_t>::value, T>::type* = nullptr>
|
||||
T* getFilterVals()
|
||||
{
|
||||
return prestored_argVals.get();
|
||||
}
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<std::is_same<T, int128_t>::value, T>::type* = nullptr>
|
||||
T* getFilterVals()
|
||||
{
|
||||
return prestored_argVals128.get();
|
||||
}
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<std::is_same<T, prestored_set_t>::value, T>::type* = nullptr>
|
||||
T* getFilterSet()
|
||||
{
|
||||
return prestored_set.get();
|
||||
}
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<std::is_same<T, prestored_set_t_128>::value, T>::type* = nullptr>
|
||||
T* getFilterSet()
|
||||
{
|
||||
return prestored_set_128.get();
|
||||
}
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr>
|
||||
void storeFilterArg(const uint32_t argIndex, const T* argValPtr)
|
||||
{
|
||||
prestored_argVals[argIndex] = *argValPtr;
|
||||
}
|
||||
|
||||
template<typename WT,
|
||||
typename std::enable_if<sizeof(WT) == sizeof(int128_t), WT>::type* = nullptr>
|
||||
void storeFilterArg(const uint32_t argIndex, const WT* argValPtr)
|
||||
{
|
||||
datatypes::TSInt128::assignPtrPtr(&(prestored_argVals128[argIndex]),
|
||||
argValPtr);
|
||||
}
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr>
|
||||
void allocateSpaceForFilterArgs()
|
||||
{
|
||||
prestored_argVals.reset(new int64_t[mFilterCount]);
|
||||
}
|
||||
|
||||
template<typename WT,
|
||||
typename std::enable_if<sizeof(WT) == sizeof(int128_t), WT>::type* = nullptr>
|
||||
void allocateSpaceForFilterArgs()
|
||||
{
|
||||
prestored_argVals128.reset(new int128_t[mFilterCount]);
|
||||
}
|
||||
|
||||
template<typename WT,
|
||||
typename std::enable_if<sizeof(WT) == sizeof(int128_t), WT>::type* = nullptr>
|
||||
void populatePrestoredSet()
|
||||
{
|
||||
prestored_set_128.reset(new prestored_set_t_128());
|
||||
|
||||
// @bug 2584, use COMPARE_NIL for "= null" to allow "is null" in OR expression
|
||||
for (uint32_t argIndex = 0; argIndex < mFilterCount; ++argIndex)
|
||||
if (prestored_rfs[argIndex] == 0)
|
||||
prestored_set_128->insert(prestored_argVals128[argIndex]);
|
||||
}
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr>
|
||||
void populatePrestoredSet()
|
||||
{
|
||||
prestored_set.reset(new prestored_set_t());
|
||||
|
||||
// @bug 2584, use COMPARE_NIL for "= null" to allow "is null" in OR expression
|
||||
for (uint32_t argIndex = 0; argIndex < mFilterCount; argIndex++)
|
||||
if (prestored_rfs[argIndex] == 0)
|
||||
prestored_set->insert(prestored_argVals[argIndex]);
|
||||
}
|
||||
|
||||
private:
|
||||
uint32_t mFilterCount;
|
||||
};
|
||||
|
||||
//@bug 1828 These need to be public so that column operations can use it for 'like'
|
||||
@ -146,9 +270,6 @@ struct p_DataValue
|
||||
const uint8_t* data;
|
||||
};
|
||||
|
||||
boost::shared_ptr<ParsedColumnFilter> parseColumnFilter(const uint8_t* filterString,
|
||||
uint32_t colWidth, uint32_t colType, uint32_t filterCount, uint32_t BOP);
|
||||
|
||||
/** @brief This class encapsulates the primitive processing functionality of the system.
|
||||
*
|
||||
* This class encapsulates the primitive processing functionality of the system.
|
||||
@ -258,7 +379,7 @@ public:
|
||||
* a NewColResultHeader, followed by the output type specified by in->OutputType.
|
||||
* \li If OT_RID, it will be an array of RIDs
|
||||
* \li If OT_DATAVALUE, it will be an array of matching data values stored in the column
|
||||
* \li If OT_BOTH, it will be an array of <DataValue, RID> pairs
|
||||
* \li If OT_BOTH, it will be an array of <RID, DataValue> pairs
|
||||
* @param outSize The size of the output buffer in bytes.
|
||||
* @param written (out parameter) A pointer to 1 int, which will contain the
|
||||
* number of bytes written to out.
|
||||
@ -267,8 +388,41 @@ public:
|
||||
void p_Col(NewColRequestHeader* in, NewColResultHeader* out, unsigned outSize,
|
||||
unsigned* written);
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) == sizeof(int8_t) ||
|
||||
sizeof(T) == sizeof(int16_t) ||
|
||||
sizeof(T) == sizeof(int128_t), T>::type* = nullptr>
|
||||
void scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out,
|
||||
unsigned outSize, unsigned* written);
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) == sizeof(int32_t), T>::type* = nullptr>
|
||||
void scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out,
|
||||
unsigned outSize, unsigned* written);
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) == sizeof(int64_t), T>::type* = nullptr>
|
||||
void scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out,
|
||||
unsigned outSize, unsigned* written);
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr>
|
||||
void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out,
|
||||
unsigned outSize, unsigned* written);
|
||||
|
||||
template<typename T,
|
||||
typename std::enable_if<sizeof(T) == sizeof(int128_t), T>::type* = nullptr>
|
||||
void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out,
|
||||
unsigned outSize, unsigned* written);
|
||||
|
||||
template<typename T>
|
||||
void columnScanAndFilter(NewColRequestHeader* in, NewColResultHeader* out,
|
||||
unsigned outSize, unsigned* written);
|
||||
|
||||
boost::shared_ptr<ParsedColumnFilter> parseColumnFilter(const uint8_t* filterString,
|
||||
uint32_t colWidth, uint32_t colType, uint32_t filterCount, uint32_t BOP);
|
||||
uint32_t colWidth,
|
||||
uint32_t colType,
|
||||
uint32_t filterCount,
|
||||
uint32_t BOP);
|
||||
void setParsedColumnFilter(boost::shared_ptr<ParsedColumnFilter>);
|
||||
|
||||
/** @brief The p_ColAggregate primitive processor.
|
||||
@ -312,9 +466,6 @@ private:
|
||||
void nextSig(int NVALS, const PrimToken* tokens, p_DataValue* ret,
|
||||
uint8_t outputFlags = 0, bool oldGetSigBehavior = false, bool skipNulls = false) throw();
|
||||
|
||||
// void do_sum8(NewColAggResultHeader *out, int64_t val);
|
||||
// void do_unsignedsum8(NewColAggResultHeader *out, int64_t val);
|
||||
|
||||
uint64_t masks[11];
|
||||
int dict_OffsetIndex, currentOffsetIndex; // used by p_dictionary
|
||||
int fDebugLevel;
|
||||
@ -326,6 +477,108 @@ private:
|
||||
friend class ::PrimTest;
|
||||
};
|
||||
|
||||
//
|
||||
//COMPILE A COLUMN FILTER
|
||||
//
|
||||
// Compile column filter from BLOB into structure optimized for fast filtering.
|
||||
// Return a shared_ptr for the compiled filter.
|
||||
template<typename T> // C++ integer type providing storage for colType
|
||||
boost::shared_ptr<ParsedColumnFilter> _parseColumnFilter(
|
||||
const uint8_t* filterString, // Filter represented as BLOB
|
||||
uint32_t colType, // Column datatype as ColDataType
|
||||
uint32_t filterCount, // Number of filter elements contained in filterString
|
||||
uint32_t BOP) // Operation (and/or/xor/none) that combines all filter elements
|
||||
{
|
||||
using UT = typename std::conditional<std::is_unsigned<T>::value || datatypes::is_uint128_t<T>::value, T, typename datatypes::make_unsigned<T>::type>::type;
|
||||
const uint32_t WIDTH = sizeof(T); // Sizeof of the column to be filtered
|
||||
|
||||
boost::shared_ptr<ParsedColumnFilter> ret; // Place for building the value to return
|
||||
if (filterCount == 0)
|
||||
return ret;
|
||||
|
||||
// Allocate the compiled filter structure with space for filterCount filters.
|
||||
// No need to init arrays since they will be filled on the fly.
|
||||
ret.reset(new ParsedColumnFilter(filterCount));
|
||||
ret->allocateSpaceForFilterArgs<T>();
|
||||
|
||||
// Choose initial filter mode based on operation and number of filter elements
|
||||
if (filterCount == 1)
|
||||
ret->columnFilterMode = SINGLE_COMPARISON;
|
||||
else if (BOP == BOP_OR)
|
||||
ret->columnFilterMode = ANY_COMPARISON_TRUE;
|
||||
else if (BOP == BOP_AND)
|
||||
ret->columnFilterMode = ALL_COMPARISONS_TRUE;
|
||||
else if (BOP == BOP_XOR)
|
||||
ret->columnFilterMode = XOR_COMPARISONS;
|
||||
else
|
||||
idbassert(0); // BOP_NONE is compatible only with filterCount <= 1
|
||||
|
||||
// Size of single filter element in filterString BLOB
|
||||
const uint32_t filterSize = sizeof(uint8_t) + sizeof(uint8_t) + WIDTH;
|
||||
|
||||
// Parse the filter predicates and insert them into argVals and cops
|
||||
for (uint32_t argIndex = 0; argIndex < filterCount; argIndex++)
|
||||
{
|
||||
// Pointer to ColArgs structure representing argIndex'th element in the BLOB
|
||||
auto args = reinterpret_cast<const ColArgs*>(filterString + (argIndex * filterSize));
|
||||
|
||||
ret->prestored_cops[argIndex] = args->COP;
|
||||
ret->prestored_rfs[argIndex] = args->rf;
|
||||
|
||||
if (datatypes::isUnsigned((execplan::CalpontSystemCatalog::ColDataType)colType))
|
||||
ret->storeFilterArg(argIndex, reinterpret_cast<const UT*>(args->val));
|
||||
else
|
||||
ret->storeFilterArg(argIndex, reinterpret_cast<const T*>(args->val));
|
||||
}
|
||||
|
||||
/* Decide which structure to use. I think the only cases where we can use the set
|
||||
are when NOPS > 1, BOP is OR, and every COP is ==,
|
||||
and when NOPS > 1, BOP is AND, and every COP is !=.
|
||||
|
||||
If there were no predicates that violate the condition for using a set,
|
||||
insert argVals into a set.
|
||||
*/
|
||||
if (filterCount > 1)
|
||||
{
|
||||
// Check that all COPs are of right kind that depends on BOP
|
||||
for (uint32_t argIndex = 0; argIndex < filterCount; argIndex++)
|
||||
{
|
||||
auto cop = ret->prestored_cops[argIndex];
|
||||
if (! ((BOP == BOP_OR && cop == COMPARE_EQ) ||
|
||||
(BOP == BOP_AND && cop == COMPARE_NE)))
|
||||
{
|
||||
goto skipConversion;
|
||||
}
|
||||
}
|
||||
|
||||
// Now we found that conversion is possible. Let's choose between array-based search
|
||||
// and set-based search depending on the set size.
|
||||
//TODO: Tailor the threshold based on the actual search algorithms used and WIDTH/SIMD_WIDTH
|
||||
if (filterCount <= ParsedColumnFilter::noSetFilterThreshold)
|
||||
{
|
||||
// Assign filter mode of array-based filtering
|
||||
if (BOP == BOP_OR)
|
||||
ret->columnFilterMode = ONE_OF_VALUES_IN_ARRAY;
|
||||
else
|
||||
ret->columnFilterMode = NONE_OF_VALUES_IN_ARRAY;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Assign filter mode of set-based filtering
|
||||
if (BOP == BOP_OR)
|
||||
ret->columnFilterMode = ONE_OF_VALUES_IN_SET;
|
||||
else
|
||||
ret->columnFilterMode = NONE_OF_VALUES_IN_SET;
|
||||
|
||||
ret->populatePrestoredSet<T>();
|
||||
}
|
||||
|
||||
skipConversion:;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
} //namespace primitives
|
||||
|
||||
#endif
|
||||
|
@ -169,6 +169,14 @@ public:
|
||||
{
|
||||
return doJoin;
|
||||
}
|
||||
primitives::PrimitiveProcessor& getPrimitiveProcessor()
|
||||
{
|
||||
return pp;
|
||||
}
|
||||
uint32_t getOutMsgSize() const
|
||||
{
|
||||
return outMsgSize;
|
||||
}
|
||||
private:
|
||||
BatchPrimitiveProcessor();
|
||||
BatchPrimitiveProcessor(const BatchPrimitiveProcessor&);
|
||||
|
@ -1,4 +1,5 @@
|
||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||
Copyright (C) 2016-2021 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
@ -249,7 +250,36 @@ void ColumnCommand::loadData()
|
||||
|
||||
void ColumnCommand::issuePrimitive()
|
||||
{
|
||||
_issuePrimitive();
|
||||
loadData();
|
||||
|
||||
if (!suppressFilter)
|
||||
bpp->getPrimitiveProcessor().setParsedColumnFilter(parsedColumnFilter);
|
||||
else
|
||||
bpp->getPrimitiveProcessor().setParsedColumnFilter(emptyFilter);
|
||||
|
||||
switch(colType.colWidth)
|
||||
{
|
||||
case 1:
|
||||
_issuePrimitive<1>();
|
||||
break;
|
||||
case 2:
|
||||
_issuePrimitive<2>();
|
||||
break;
|
||||
case 4:
|
||||
_issuePrimitive<4>();
|
||||
break;
|
||||
case 8:
|
||||
_issuePrimitive<8>();
|
||||
break;
|
||||
case 16:
|
||||
_issuePrimitive<16>();
|
||||
break;
|
||||
default:
|
||||
throw NotImplementedExcept(std::string("ColumnCommand::_issuePrimitive does not support ")
|
||||
+ std::to_string(colType.colWidth)
|
||||
+ std::string(" byte width."));
|
||||
}
|
||||
|
||||
if (_isScan)
|
||||
{
|
||||
if (LIKELY(colType.isNarrow()))
|
||||
@ -259,18 +289,29 @@ void ColumnCommand::issuePrimitive()
|
||||
}
|
||||
}
|
||||
|
||||
void ColumnCommand::_issuePrimitive()
|
||||
template<int W>
|
||||
void ColumnCommand::_issuePrimitiveNarrow()
|
||||
{
|
||||
uint32_t resultSize;
|
||||
|
||||
loadData();
|
||||
_loadData<W>();
|
||||
|
||||
if (!suppressFilter)
|
||||
bpp->pp.setParsedColumnFilter(parsedColumnFilter);
|
||||
bpp->getPrimitiveProcessor().setParsedColumnFilter(parsedColumnFilter);
|
||||
else
|
||||
bpp->pp.setParsedColumnFilter(emptyFilter);
|
||||
bpp->getPrimitiveProcessor().setParsedColumnFilter(emptyFilter);
|
||||
|
||||
bpp->pp.p_Col(primMsg, outMsg, bpp->outMsgSize, (unsigned int*)&resultSize);
|
||||
_issuePrimitive<W>();
|
||||
|
||||
if (_isScan)
|
||||
updateCPDataNarrow();
|
||||
}
|
||||
|
||||
|
||||
template<int W>
|
||||
void ColumnCommand::_issuePrimitive()
|
||||
{
|
||||
using IntegralType = typename datatypes::WidthToSIntegralType<W>::type;
|
||||
uint32_t resultSize;
|
||||
bpp->getPrimitiveProcessor().columnScanAndFilter<IntegralType>(primMsg, outMsg, bpp->getOutMsgSize(), (unsigned int*)&resultSize);
|
||||
} // _issuePrimitive()
|
||||
|
||||
void ColumnCommand::updateCPDataNarrow()
|
||||
@ -559,14 +600,33 @@ void ColumnCommand::createCommand(ByteStream& bs)
|
||||
deserializeInlineVector(bs, lastLbid);
|
||||
|
||||
Command::createCommand(bs);
|
||||
switch (colType.colWidth)
|
||||
{
|
||||
case 1:
|
||||
createColumnFilter<datatypes::WidthToSIntegralType<1>::type>();
|
||||
break;
|
||||
|
||||
parsedColumnFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, filterCount, BOP);
|
||||
case 2:
|
||||
createColumnFilter<datatypes::WidthToSIntegralType<2>::type>();
|
||||
break;
|
||||
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, 0, BOP);
|
||||
case 4:
|
||||
createColumnFilter<datatypes::WidthToSIntegralType<4>::type>();
|
||||
break;
|
||||
|
||||
case 8:
|
||||
createColumnFilter<datatypes::WidthToSIntegralType<8>::type>();
|
||||
break;
|
||||
|
||||
case 16:
|
||||
createColumnFilter<datatypes::WidthToSIntegralType<16>::type>();
|
||||
break;
|
||||
|
||||
default:
|
||||
throw NotImplementedExcept(std::string("ColumnCommand::createCommand does not support ")
|
||||
+ std::to_string(colType.colWidth)
|
||||
+ std::string(" byte width."));
|
||||
}
|
||||
}
|
||||
|
||||
void ColumnCommand::createCommand(execplan::CalpontSystemCatalog::ColType& aColType, ByteStream& bs)
|
||||
@ -1023,6 +1083,18 @@ ColumnCommand* ColumnCommandFabric::createCommand(messageqcpp::ByteStream& bs)
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void ColumnCommand::createColumnFilter()
|
||||
{
|
||||
parsedColumnFilter = primitives::_parseColumnFilter<T>(filterString.buf(),
|
||||
colType.colDataType,
|
||||
filterCount, BOP);
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::_parseColumnFilter<T>(filterString.buf(),
|
||||
colType.colDataType,
|
||||
0, BOP);
|
||||
}
|
||||
|
||||
ColumnCommand* ColumnCommandFabric::duplicate(const ColumnCommandUniquePtr& rhs)
|
||||
{
|
||||
auto & command = *rhs;
|
||||
@ -1066,12 +1138,7 @@ ColumnCommand* ColumnCommandFabric::duplicate(const ColumnCommandUniquePtr& rhs)
|
||||
ColumnCommandInt8::ColumnCommandInt8(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream& bs)
|
||||
{
|
||||
ColumnCommand::createCommand(aColType, bs);
|
||||
parsedColumnFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, filterCount, BOP);
|
||||
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, 0, BOP);
|
||||
createColumnFilter<IntegralType>();
|
||||
}
|
||||
|
||||
void ColumnCommandInt8::prep(int8_t outputType, bool absRids)
|
||||
@ -1104,17 +1171,15 @@ void ColumnCommandInt8::projectResultRG(RowGroup& rg, uint32_t pos)
|
||||
_projectResultRG<size>(rg, pos);
|
||||
}
|
||||
|
||||
void ColumnCommandInt8::issuePrimitive()
|
||||
{
|
||||
_issuePrimitiveNarrow<size>();
|
||||
}
|
||||
|
||||
ColumnCommandInt16::ColumnCommandInt16(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream& bs)
|
||||
{
|
||||
ColumnCommand::createCommand(aColType, bs);
|
||||
parsedColumnFilter = primitives::parseColumnFilter(filterString.buf(),
|
||||
colType.colWidth,
|
||||
colType.colDataType,
|
||||
filterCount, BOP);
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::parseColumnFilter(filterString.buf(),
|
||||
colType.colWidth,
|
||||
colType.colDataType, 0, BOP);
|
||||
createColumnFilter<IntegralType>();
|
||||
}
|
||||
|
||||
void ColumnCommandInt16::prep(int8_t outputType, bool absRids)
|
||||
@ -1147,15 +1212,15 @@ void ColumnCommandInt16::projectResultRG(RowGroup& rg, uint32_t pos)
|
||||
_projectResultRG<size>(rg, pos);
|
||||
}
|
||||
|
||||
void ColumnCommandInt16::issuePrimitive()
|
||||
{
|
||||
_issuePrimitiveNarrow<size>();
|
||||
}
|
||||
|
||||
ColumnCommandInt32::ColumnCommandInt32(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream& bs)
|
||||
{
|
||||
ColumnCommand::createCommand(aColType, bs);
|
||||
parsedColumnFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, filterCount, BOP);
|
||||
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, 0, BOP);
|
||||
createColumnFilter<IntegralType>();
|
||||
}
|
||||
|
||||
void ColumnCommandInt32::prep(int8_t outputType, bool absRids)
|
||||
@ -1188,15 +1253,15 @@ void ColumnCommandInt32::projectResultRG(RowGroup& rg, uint32_t pos)
|
||||
_projectResultRG<size>(rg, pos);
|
||||
}
|
||||
|
||||
void ColumnCommandInt32::issuePrimitive()
|
||||
{
|
||||
_issuePrimitiveNarrow<size>();
|
||||
}
|
||||
|
||||
ColumnCommandInt64::ColumnCommandInt64(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream& bs)
|
||||
{
|
||||
ColumnCommand::createCommand(aColType, bs);
|
||||
parsedColumnFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, filterCount, BOP);
|
||||
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::parseColumnFilter(filterString.buf(), colType.colWidth,
|
||||
colType.colDataType, 0, BOP);
|
||||
createColumnFilter<IntegralType>();
|
||||
}
|
||||
|
||||
void ColumnCommandInt64::prep(int8_t outputType, bool absRids)
|
||||
@ -1229,17 +1294,15 @@ void ColumnCommandInt64::projectResultRG(RowGroup& rg, uint32_t pos)
|
||||
_projectResultRG<size>(rg, pos);
|
||||
}
|
||||
|
||||
void ColumnCommandInt64::issuePrimitive()
|
||||
{
|
||||
_issuePrimitiveNarrow<size>();
|
||||
}
|
||||
|
||||
ColumnCommandInt128::ColumnCommandInt128(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream& bs)
|
||||
{
|
||||
ColumnCommand::createCommand(aColType, bs);
|
||||
parsedColumnFilter = primitives::parseColumnFilter(filterString.buf(),
|
||||
colType.colWidth,
|
||||
colType.colDataType,
|
||||
filterCount, BOP);
|
||||
/* OR hack */
|
||||
emptyFilter = primitives::parseColumnFilter(filterString.buf(),
|
||||
colType.colWidth,
|
||||
colType.colDataType, 0, BOP);
|
||||
createColumnFilter<IntegralType>();
|
||||
}
|
||||
|
||||
void ColumnCommandInt128::prep(int8_t outputType, bool absRids)
|
||||
@ -1272,5 +1335,19 @@ void ColumnCommandInt128::projectResultRG(RowGroup& rg, uint32_t pos)
|
||||
_projectResultRG<size>(rg, pos);
|
||||
}
|
||||
|
||||
void ColumnCommandInt128::issuePrimitive()
|
||||
{
|
||||
loadData();
|
||||
|
||||
if (!suppressFilter)
|
||||
bpp->getPrimitiveProcessor().setParsedColumnFilter(parsedColumnFilter);
|
||||
else
|
||||
bpp->getPrimitiveProcessor().setParsedColumnFilter(emptyFilter);
|
||||
|
||||
_issuePrimitive<size>();
|
||||
if (_isScan)
|
||||
updateCPDataWide();
|
||||
}
|
||||
|
||||
}
|
||||
// vim:ts=4 sw=4:
|
||||
|
@ -113,9 +113,15 @@ protected:
|
||||
void _loadData();
|
||||
void updateCPDataNarrow();
|
||||
void updateCPDataWide();
|
||||
template<int W>
|
||||
void _issuePrimitiveNarrow();
|
||||
template<int W>
|
||||
void _issuePrimitive();
|
||||
virtual void issuePrimitive();
|
||||
void duplicate(ColumnCommand*);
|
||||
void fillInPrimitiveMessageHeader(const int8_t outputType, const bool absRids);
|
||||
template<typename T>
|
||||
void createColumnFilter();
|
||||
|
||||
// we only care about the width and type fields.
|
||||
//On the PM the rest is uninitialized
|
||||
@ -125,7 +131,6 @@ protected:
|
||||
ColumnCommand& operator=(const ColumnCommand&);
|
||||
|
||||
void _execute();
|
||||
void issuePrimitive();
|
||||
void processResult();
|
||||
template<int W>
|
||||
void _process_OT_BOTH();
|
||||
@ -202,6 +207,7 @@ class ColumnCommandInt8 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 1;
|
||||
using IntegralType = datatypes::WidthToSIntegralType<size>::type;
|
||||
ColumnCommandInt8() : ColumnCommand() { };
|
||||
ColumnCommandInt8(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
@ -209,12 +215,14 @@ class ColumnCommandInt8 : public ColumnCommand
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
void issuePrimitive() override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt16 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 2;
|
||||
using IntegralType = datatypes::WidthToSIntegralType<size>::type;
|
||||
ColumnCommandInt16() : ColumnCommand() { };
|
||||
ColumnCommandInt16(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
@ -222,12 +230,14 @@ class ColumnCommandInt16 : public ColumnCommand
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
void issuePrimitive() override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt32 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 4;
|
||||
using IntegralType = datatypes::WidthToSIntegralType<size>::type;
|
||||
ColumnCommandInt32() : ColumnCommand() { };
|
||||
ColumnCommandInt32(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
@ -235,12 +245,14 @@ class ColumnCommandInt32 : public ColumnCommand
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
void issuePrimitive() override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt64 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 8;
|
||||
using IntegralType = datatypes::WidthToSIntegralType<size>::type;
|
||||
ColumnCommandInt64() : ColumnCommand() { };
|
||||
ColumnCommandInt64(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
@ -248,12 +260,14 @@ class ColumnCommandInt64 : public ColumnCommand
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
void issuePrimitive() override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt128 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 16;
|
||||
using IntegralType = datatypes::WidthToSIntegralType<size>::type;
|
||||
ColumnCommandInt128() : ColumnCommand() { };
|
||||
ColumnCommandInt128(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
@ -261,6 +275,7 @@ class ColumnCommandInt128 : public ColumnCommand
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
void issuePrimitive() override;
|
||||
};
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user