/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // // $Id: columncommand.h 2057 2013-02-13 17:00:10Z pleblanc $ // C++ Interface: columncommand // // Description: // // // Author: Patrick LeBlanc , (C) 2008 // // Copyright: See COPYING file that comes with this distribution // // #pragma once #include #include "columnwidth.h" #include "command.h" #include "calpontsystemcatalog.h" namespace primitiveprocessor { // Warning. As of 6.1.1 ColumnCommand has some code duplication. // There are number of derived classes specialized by column width. // There are also legacy generic CC methods used by PseudoCC. class ColumnCommand : public Command { public: ColumnCommand(); ColumnCommand(execplan::CalpontSystemCatalog::ColType& aColType); ~ColumnCommand() override; inline uint64_t getLBID() override { return lbid; } inline uint8_t getWidth() { return colType.colWidth; } inline uint8_t getScale() { return colType.scale; } uint16_t getFilterCount() { return filterCount; } const execplan::CalpontSystemCatalog::ColType& getColType() { return colType; } void execute() override; void execute(int64_t* vals); // used by RTSCommand to redirect values void prep(int8_t outputType, bool absRids) override; void project() override; void projectIntoRowGroup(rowgroup::RowGroup& rg, uint32_t pos) override; void nextLBID() override; bool isScan() { return _isScan; } bool hasAuxCol() const { return hasAuxCol_; } uint64_t getLBIDAux() const { return lbidAux; } void createCommand(messageqcpp::ByteStream&) override; void createCommand(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream&); void resetCommand(messageqcpp::ByteStream&) override; void setMakeAbsRids(bool m) override { makeAbsRids = m; } bool willPrefetch(); int64_t getLastLbid(); void getLBIDList(uint32_t loopCount, std::vector* lbids) override; SCommand duplicate() override; bool operator==(const ColumnCommand&) const; bool operator!=(const ColumnCommand&) const; /* OR hacks */ void setScan(bool b) { _isScan = b; } void disableFilters(); void enableFilters(); int getCompType() const override { return colType.compressionType; } protected: virtual void loadData(); template void _loadData(); void updateCPDataNarrow(); void updateCPDataWide(); template void _issuePrimitiveNarrow(); template void _issuePrimitive(); virtual void issuePrimitive(); void duplicate(ColumnCommand*); void fillInPrimitiveMessageHeader(const int8_t outputType, const bool absRids); template void createColumnFilter(); // we only care about the width and type fields. // On the PM the rest is uninitialized execplan::CalpontSystemCatalog::ColType colType; ColumnCommand(const ColumnCommand&); ColumnCommand& operator=(const ColumnCommand&); void _execute(); void processResult(); template void _process_OT_BOTH(); template void _process_OT_BOTH_wAbsRids(); virtual void process_OT_BOTH(); void process_OT_RID(); virtual void process_OT_DATAVALUE(); template void _process_OT_DATAVALUE(); void process_OT_ROWGROUP(); void projectResult(); template void _projectResultRGLoop(rowgroup::Row& r, const T* valuesArray, const uint32_t offset); template void _projectResultRG(rowgroup::RowGroup& rg, uint32_t pos); virtual void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos); void removeRowsFromRowGroup(rowgroup::RowGroup&); void makeScanMsg(); void makeStepMsg(); void setLBID(uint64_t rid); template inline void fillEmptyBlock(uint8_t* dst, const uint8_t* emptyValue, const uint32_t number) const; bool _isScan; std::unique_ptr inputMsg; NewColRequestHeader* primMsg; ColResultHeader* outMsg; // the length of base prim msg, which is everything up to the // rid array for the pCol message // !!! This attribute is used to store a sum which arg type is potentially uint64_t. // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint32_t baseMsgLength; uint64_t lbid; bool hasAuxCol_; uint64_t lbidAux; uint32_t traceFlags; // probably move this to Command uint8_t BOP; messageqcpp::ByteStream filterString; uint16_t filterCount; bool makeAbsRids; int64_t* values; // this is usually bpp->values; RTSCommand needs to use a different container int128_t* wide128Values; uint16_t mask, shift; // vars for the selective block loader // counters to decide whether to prefetch or not uint32_t blockCount, loadCount; boost::shared_ptr parsedColumnFilter; /* OR hacks */ boost::shared_ptr emptyFilter; bool suppressFilter; std::vector lastLbid; /* speculative optimizations for projectintorowgroup() */ rowgroup::Row r; uint32_t rowSize; bool wasVersioned; friend class RTSCommand; }; using ColumnCommandUniquePtr = std::unique_ptr; class ColumnCommandInt8 : public ColumnCommand { public: static constexpr uint8_t size = 1; using IntegralType = datatypes::WidthToSIntegralType::type; ColumnCommandInt8() : ColumnCommand(){}; ColumnCommandInt8(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs); void prep(int8_t outputType, bool absRids) override; void loadData() override; void process_OT_BOTH() override; void process_OT_DATAVALUE() override; void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override; void issuePrimitive() override; }; class ColumnCommandInt16 : public ColumnCommand { public: static constexpr uint8_t size = 2; using IntegralType = datatypes::WidthToSIntegralType::type; ColumnCommandInt16() : ColumnCommand(){}; ColumnCommandInt16(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs); void prep(int8_t outputType, bool absRids) override; void loadData() override; void process_OT_BOTH() override; void process_OT_DATAVALUE() override; void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override; void issuePrimitive() override; }; class ColumnCommandInt32 : public ColumnCommand { public: static constexpr uint8_t size = 4; using IntegralType = datatypes::WidthToSIntegralType::type; ColumnCommandInt32() : ColumnCommand(){}; ColumnCommandInt32(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs); void prep(int8_t outputType, bool absRids) override; void loadData() override; void process_OT_BOTH() override; void process_OT_DATAVALUE() override; void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override; void issuePrimitive() override; }; class ColumnCommandInt64 : public ColumnCommand { public: static constexpr uint8_t size = 8; using IntegralType = datatypes::WidthToSIntegralType::type; ColumnCommandInt64() : ColumnCommand(){}; ColumnCommandInt64(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs); void prep(int8_t outputType, bool absRids) override; void loadData() override; void process_OT_BOTH() override; void process_OT_DATAVALUE() override; void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override; void issuePrimitive() override; }; class ColumnCommandInt128 : public ColumnCommand { public: static constexpr uint8_t size = 16; using IntegralType = datatypes::WidthToSIntegralType::type; ColumnCommandInt128() : ColumnCommand(){}; ColumnCommandInt128(execplan::CalpontSystemCatalog::ColType& colType, messageqcpp::ByteStream& bs); void prep(int8_t outputType, bool absRids) override; void loadData() override; void process_OT_BOTH() override; void process_OT_DATAVALUE() override; void projectResultRG(rowgroup::RowGroup& rg, uint32_t pos) override; void issuePrimitive() override; }; class ColumnCommandFabric { public: ColumnCommandFabric() = default; static ColumnCommand* createCommand(messageqcpp::ByteStream& bs); static ColumnCommand* duplicate(const ColumnCommandUniquePtr& rhs); }; template inline void ColumnCommand::fillEmptyBlock(uint8_t* dst, const uint8_t* emptyValue, const uint32_t number) const { T* typedDst = reinterpret_cast(dst); const T* typedEmptyValue = reinterpret_cast(emptyValue); for (uint32_t idx = 0; idx < number; idx = idx + 4) { typedDst[idx] = *typedEmptyValue; typedDst[idx + 1] = *typedEmptyValue; typedDst[idx + 2] = *typedEmptyValue; typedDst[idx + 3] = *typedEmptyValue; } } template <> inline void ColumnCommand::fillEmptyBlock(uint8_t* dst, const uint8_t* emptyValue, const uint32_t number) const { for (uint32_t idx = 0; idx < number; idx++) { datatypes::TSInt128::assignPtrPtr(dst + idx * sizeof(messageqcpp::ByteStream::hexbyte), emptyValue); } } } // namespace primitiveprocessor