You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Merge pull request #2052 from drrtuy/MCOL-4815
MCOL-4815 ColumnCommand was replaced with a set of derived classes sp…
This commit is contained in:
@ -129,6 +129,25 @@ template<> struct make_unsigned<int128_t> { typedef uint128_t type; };
|
||||
namespace datatypes
|
||||
{
|
||||
|
||||
template<int W, typename T = void>
|
||||
struct _WidthToSIntegralType
|
||||
{
|
||||
typedef T type;
|
||||
};
|
||||
|
||||
template <int W>
|
||||
struct WidthToSIntegralType: _WidthToSIntegralType<W> { };
|
||||
template <>
|
||||
struct WidthToSIntegralType<1>: _WidthToSIntegralType<1, int8_t> { };
|
||||
template <>
|
||||
struct WidthToSIntegralType<2>: _WidthToSIntegralType<2, int16_t> { };
|
||||
template <>
|
||||
struct WidthToSIntegralType<4>: _WidthToSIntegralType<4, int32_t> { };
|
||||
template <>
|
||||
struct WidthToSIntegralType<8>: _WidthToSIntegralType<8, int64_t> { };
|
||||
template <>
|
||||
struct WidthToSIntegralType<16>: _WidthToSIntegralType<16, int128_t> { };
|
||||
|
||||
class SystemCatalog
|
||||
{
|
||||
public:
|
||||
@ -294,6 +313,16 @@ public:
|
||||
colWidth == MAXDECIMALWIDTH;
|
||||
}
|
||||
|
||||
inline bool isWide() const
|
||||
{
|
||||
return colWidth > static_cast<int32_t>(MAXLEGACYWIDTH);
|
||||
}
|
||||
|
||||
inline bool isNarrow() const
|
||||
{
|
||||
return colWidth <= static_cast<int32_t>(MAXLEGACYWIDTH);
|
||||
}
|
||||
|
||||
bool isUnsignedInteger() const
|
||||
{
|
||||
switch (colDataType)
|
||||
|
@ -133,37 +133,19 @@ ColumnCommandJL::~ColumnCommandJL()
|
||||
{
|
||||
}
|
||||
|
||||
// The other leg is in PP, its name is ColumnCommand::makeCommand.
|
||||
void ColumnCommandJL::createCommand(ByteStream& bs) const
|
||||
{
|
||||
bs << (uint8_t) COLUMN_COMMAND;
|
||||
colType.serialize(bs);
|
||||
bs << (uint8_t) isScan;
|
||||
// cout << "sending lbid " << lbid << endl;
|
||||
bs << traceFlags;
|
||||
bs << filterString;
|
||||
#if 0
|
||||
cout << "filter string: ";
|
||||
|
||||
for (uint32_t i = 0; i < filterString.length(); ++i)
|
||||
cout << (int) filterString.buf()[i] << " ";
|
||||
|
||||
cout << endl;
|
||||
#endif
|
||||
bs << (uint8_t) colType.colDataType;
|
||||
bs << (uint8_t) colType.colWidth;
|
||||
bs << (uint8_t) colType.scale;
|
||||
bs << (uint8_t) colType.compressionType;
|
||||
bs << (uint32_t) colType.charsetNumber;
|
||||
bs << BOP;
|
||||
bs << filterCount;
|
||||
serializeInlineVector(bs, fLastLbid);
|
||||
//bs << (uint64_t)fLastLbid;
|
||||
|
||||
CommandJL::createCommand(bs);
|
||||
|
||||
/* XXXPAT: for debugging only, we can get rid of this one */
|
||||
// bs << colType.columnOID;
|
||||
|
||||
// cout << "filter Count is " << filterCount << " ----" << endl;
|
||||
}
|
||||
|
||||
void ColumnCommandJL::runCommand(ByteStream& bs) const
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -34,16 +34,18 @@
|
||||
#include "command.h"
|
||||
#include "calpontsystemcatalog.h"
|
||||
|
||||
using CSCDataType = execplan::CalpontSystemCatalog::ColDataType;
|
||||
|
||||
namespace primitiveprocessor
|
||||
{
|
||||
|
||||
// #warning got the ColumnCommand definition
|
||||
// Warning. As of 6.1.1 ColumnCommand has some code duplication.
|
||||
// There are number of derived classes specialized by column width.
|
||||
// There are also legacy generic CC methods used by PseudoCC.
|
||||
class ColumnCommand : public Command
|
||||
{
|
||||
public:
|
||||
ColumnCommand();
|
||||
ColumnCommand(execplan::CalpontSystemCatalog::ColType& aColType);
|
||||
virtual ~ColumnCommand();
|
||||
|
||||
inline uint64_t getLBID()
|
||||
@ -69,7 +71,7 @@ public:
|
||||
|
||||
void execute();
|
||||
void execute(int64_t* vals); //used by RTSCommand to redirect values
|
||||
void prep(int8_t outputType, bool makeAbsRids);
|
||||
virtual void prep(int8_t outputType, bool absRids);
|
||||
void project();
|
||||
void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t pos);
|
||||
void nextLBID();
|
||||
@ -78,6 +80,7 @@ public:
|
||||
return _isScan;
|
||||
}
|
||||
void createCommand(messageqcpp::ByteStream&);
|
||||
void createCommand(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream&);
|
||||
void resetCommand(messageqcpp::ByteStream&);
|
||||
void setMakeAbsRids(bool m)
|
||||
{
|
||||
@ -106,25 +109,43 @@ public:
|
||||
|
||||
protected:
|
||||
virtual void loadData();
|
||||
template <int W>
|
||||
void _loadData();
|
||||
void updateCPDataNarrow();
|
||||
void updateCPDataWide();
|
||||
void _issuePrimitive();
|
||||
void duplicate(ColumnCommand*);
|
||||
void fillInPrimitiveMessageHeader(const int8_t outputType, const bool absRids);
|
||||
|
||||
// we only care about the width and type fields.
|
||||
//On the PM the rest is uninitialized
|
||||
execplan::CalpontSystemCatalog::ColType colType;
|
||||
|
||||
private:
|
||||
ColumnCommand(const ColumnCommand&);
|
||||
ColumnCommand& operator=(const ColumnCommand&);
|
||||
|
||||
void _execute();
|
||||
void issuePrimitive();
|
||||
void processResult();
|
||||
void process_OT_BOTH();
|
||||
template<int W>
|
||||
void _process_OT_BOTH();
|
||||
template<int W>
|
||||
void _process_OT_BOTH_wAbsRids();
|
||||
virtual void process_OT_BOTH();
|
||||
void process_OT_RID();
|
||||
void process_OT_DATAVALUE();
|
||||
virtual void process_OT_DATAVALUE();
|
||||
template<int W>
|
||||
void _process_OT_DATAVALUE();
|
||||
void process_OT_ROWGROUP();
|
||||
void projectResult();
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos);
|
||||
template<int W>
|
||||
void _projectResultRGLoop(rowgroup::Row& r,
|
||||
uint8_t* msg8,
|
||||
const uint32_t gapSize,
|
||||
const uint32_t offset);
|
||||
template<int W>
|
||||
void _projectResultRG(rowgroup::RowGroup& rg, uint32_t pos);
|
||||
virtual void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos);
|
||||
void removeRowsFromRowGroup(rowgroup::RowGroup&);
|
||||
void makeScanMsg();
|
||||
void makeStepMsg();
|
||||
@ -175,6 +196,82 @@ private:
|
||||
friend class RTSCommand;
|
||||
};
|
||||
|
||||
using ColumnCommandUniquePtr = std::unique_ptr<ColumnCommand>;
|
||||
|
||||
class ColumnCommandInt8 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 1;
|
||||
ColumnCommandInt8() : ColumnCommand() { };
|
||||
ColumnCommandInt8(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
void loadData() override;
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt16 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 2;
|
||||
ColumnCommandInt16() : ColumnCommand() { };
|
||||
ColumnCommandInt16(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
void loadData() override;
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt32 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 4;
|
||||
ColumnCommandInt32() : ColumnCommand() { };
|
||||
ColumnCommandInt32(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
void loadData() override;
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt64 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 8;
|
||||
ColumnCommandInt64() : ColumnCommand() { };
|
||||
ColumnCommandInt64(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
void loadData() override;
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
};
|
||||
|
||||
class ColumnCommandInt128 : public ColumnCommand
|
||||
{
|
||||
public:
|
||||
static constexpr uint8_t size = 16;
|
||||
ColumnCommandInt128() : ColumnCommand() { };
|
||||
ColumnCommandInt128(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs);
|
||||
void prep(int8_t outputType, bool absRids) override;
|
||||
void loadData() override;
|
||||
void process_OT_BOTH() override;
|
||||
void process_OT_DATAVALUE() override;
|
||||
void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override;
|
||||
};
|
||||
|
||||
|
||||
class ColumnCommandFabric
|
||||
{
|
||||
public:
|
||||
ColumnCommandFabric() = default;
|
||||
static ColumnCommand* createCommand(messageqcpp::ByteStream& bs);
|
||||
static ColumnCommand* duplicate(const ColumnCommandUniquePtr& rhs);
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
inline void ColumnCommand::fillEmptyBlock(uint8_t* dst,
|
||||
const uint8_t*emptyValue,
|
||||
@ -203,8 +300,7 @@ inline void ColumnCommand::fillEmptyBlock<messageqcpp::ByteStream::hexbyte>(uint
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
} // namespace
|
||||
|
||||
#endif
|
||||
// vim:ts=4 sw=4:
|
||||
|
@ -53,7 +53,7 @@ Command* Command::makeCommand(ByteStream& bs, CommandType* type, vector<SCommand
|
||||
switch (*type)
|
||||
{
|
||||
case COLUMN_COMMAND:
|
||||
ret = new ColumnCommand();
|
||||
return ColumnCommandFabric::createCommand(bs);
|
||||
break;
|
||||
|
||||
case DICT_STEP:
|
||||
|
@ -81,13 +81,13 @@ void RTSCommand::project()
|
||||
if (bpp->absRids.get() == NULL)
|
||||
bpp->absRids.reset(new uint64_t[LOGICAL_BLOCK_RIDS]);
|
||||
|
||||
col.execute(tmpValues);
|
||||
col->execute(tmpValues);
|
||||
|
||||
if (old_rc != bpp->ridCount)
|
||||
{
|
||||
ostringstream os;
|
||||
|
||||
os << __FILE__ << " (token column) error on projection for oid " << col.getOID() << " lbid " << col.getLBID();
|
||||
os << __FILE__ << " (token column) error on projection for oid " << col->getOID() << " lbid " << col->getLBID();
|
||||
os << ": input rids " << old_rc;
|
||||
os << ", output rids " << bpp->ridCount << endl;
|
||||
|
||||
@ -127,14 +127,14 @@ void RTSCommand::projectIntoRowGroup(RowGroup& rg, uint32_t colNum)
|
||||
if (bpp->absRids.get() == NULL)
|
||||
bpp->absRids.reset(new uint64_t[LOGICAL_BLOCK_RIDS]);
|
||||
|
||||
col.execute(tmpValues);
|
||||
col->execute(tmpValues);
|
||||
|
||||
if (old_rc != bpp->ridCount)
|
||||
{
|
||||
|
||||
ostringstream os;
|
||||
|
||||
os << __FILE__ << " (token column) error on projection for oid " << col.getOID() << " lbid " << col.getLBID();
|
||||
os << __FILE__ << " (token column) error on projection for oid " << col->getOID() << " lbid " << col->getLBID();
|
||||
os << ": input rids " << old_rc;
|
||||
os << ", output rids " << bpp->ridCount << endl;
|
||||
|
||||
@ -151,7 +151,7 @@ void RTSCommand::projectIntoRowGroup(RowGroup& rg, uint32_t colNum)
|
||||
uint64_t RTSCommand::getLBID()
|
||||
{
|
||||
if (!passThru)
|
||||
return col.getLBID();
|
||||
return col->getLBID();
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
@ -159,13 +159,13 @@ uint64_t RTSCommand::getLBID()
|
||||
void RTSCommand::nextLBID()
|
||||
{
|
||||
if (!passThru)
|
||||
col.nextLBID();
|
||||
col->nextLBID();
|
||||
}
|
||||
|
||||
void RTSCommand::prep(int8_t outputType, bool makeAbsRids)
|
||||
{
|
||||
if (!passThru)
|
||||
col.prep(OT_BOTH, true);
|
||||
col->prep(OT_BOTH, true);
|
||||
|
||||
dict.prep(OT_DATAVALUE, true);
|
||||
}
|
||||
@ -177,7 +177,7 @@ void RTSCommand::createCommand(ByteStream& bs)
|
||||
|
||||
if (!passThru)
|
||||
{
|
||||
col.createCommand(bs);
|
||||
col.reset(ColumnCommandFabric::createCommand(bs));
|
||||
}
|
||||
|
||||
dict.createCommand(bs);
|
||||
@ -187,7 +187,7 @@ void RTSCommand::createCommand(ByteStream& bs)
|
||||
void RTSCommand::resetCommand(ByteStream& bs)
|
||||
{
|
||||
if (!passThru)
|
||||
col.resetCommand(bs);
|
||||
col->resetCommand(bs);
|
||||
|
||||
dict.resetCommand(bs);
|
||||
}
|
||||
@ -202,7 +202,7 @@ SCommand RTSCommand::duplicate()
|
||||
rts->passThru = passThru;
|
||||
|
||||
if (!passThru)
|
||||
rts->col = col;
|
||||
rts->col.reset(ColumnCommandFabric::duplicate(col));
|
||||
|
||||
rts->dict = dict;
|
||||
rts->Command::duplicate(this);
|
||||
@ -237,7 +237,7 @@ void RTSCommand::getLBIDList(uint32_t loopCount, vector<int64_t>* lbids)
|
||||
dict.getLBIDList(loopCount, lbids);
|
||||
|
||||
if (!passThru)
|
||||
col.getLBIDList(loopCount, lbids);
|
||||
col->getLBIDList(loopCount, lbids);
|
||||
}
|
||||
|
||||
void RTSCommand::setBatchPrimitiveProcessor(BatchPrimitiveProcessor* b)
|
||||
@ -246,7 +246,7 @@ void RTSCommand::setBatchPrimitiveProcessor(BatchPrimitiveProcessor* b)
|
||||
dict.setBatchPrimitiveProcessor(b);
|
||||
|
||||
if (!passThru)
|
||||
col.setBatchPrimitiveProcessor(b);
|
||||
col->setBatchPrimitiveProcessor(b);
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -33,10 +33,12 @@
|
||||
|
||||
#include "command.h"
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <memory>
|
||||
|
||||
namespace primitiveprocessor
|
||||
{
|
||||
|
||||
|
||||
class RTSCommand : public Command
|
||||
{
|
||||
public:
|
||||
@ -76,7 +78,7 @@ public:
|
||||
private:
|
||||
RTSCommand(const RTSCommand&);
|
||||
|
||||
ColumnCommand col;
|
||||
ColumnCommandUniquePtr col;
|
||||
DictStep dict;
|
||||
uint8_t passThru;
|
||||
bool absNull;
|
||||
|
@ -471,6 +471,26 @@ private:
|
||||
uint32_t fMaxLen; //how big fBuf is currently
|
||||
};
|
||||
|
||||
template<int W, typename T = void>
|
||||
struct _ByteStreamType
|
||||
{
|
||||
typedef T type;
|
||||
};
|
||||
|
||||
template <int W>
|
||||
struct ByteStreamType: _ByteStreamType<W> { };
|
||||
|
||||
template <>
|
||||
struct ByteStreamType<1>: _ByteStreamType<1, ByteStream::byte> { };
|
||||
template <>
|
||||
struct ByteStreamType<2>: _ByteStreamType<2, ByteStream::doublebyte> { };
|
||||
template <>
|
||||
struct ByteStreamType<4>: _ByteStreamType<4, ByteStream::quadbyte> { };
|
||||
template <>
|
||||
struct ByteStreamType<8>: _ByteStreamType<8, ByteStream::octbyte> { };
|
||||
template <>
|
||||
struct ByteStreamType<16>: _ByteStreamType<16, ByteStream::hexbyte> { };
|
||||
|
||||
// type descriptors to let ByteStream point out protocol errors, WIP
|
||||
static const uint8_t BS_UINT8 = 0;
|
||||
static const uint8_t BS_UINT16 = 1;
|
||||
@ -704,6 +724,12 @@ void deserializeSet(ByteStream& bs, std::set<T>& s)
|
||||
s.insert(tmp);
|
||||
}
|
||||
}
|
||||
/*
|
||||
template<>
|
||||
struct ByteStream::_ByteStreamType<1, ByteStream::byte>>
|
||||
{
|
||||
typedef ByteStream::byte type;
|
||||
}*/
|
||||
|
||||
}//namespace messageqcpp
|
||||
|
||||
|
Reference in New Issue
Block a user