diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 7b258c4de..1faccda6e 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -90,12 +90,15 @@ BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL() { } -void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, vector lastScannedLBID) +void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, + vector lastScannedLBID, + bool hasAuxCol, + const std::vector& extentsAux) { SCommand cc; tableOID = scan.tableOid(); - cc.reset(new ColumnCommandJL(scan, lastScannedLBID)); + cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux)); cc->setBatchPrimitiveProcessor(this); cc->setQueryUuid(scan.queryUuid()); cc->setStepUuid(uuid); diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index b0b9a7ae6..6a76332e2 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -115,7 +115,8 @@ class BatchPrimitiveProcessorJL threadCount = tc; } - void addFilterStep(const pColScanStep&, std::vector lastScannedLBID); + void addFilterStep(const pColScanStep&, std::vector lastScannedLBID, + bool hasAuxCol, const std::vector& extentsAux); void addFilterStep(const PseudoColStep&); void addFilterStep(const pColStep&); void addFilterStep(const pDictionaryStep&); diff --git a/dbcon/joblist/columncommand-jl.cpp b/dbcon/joblist/columncommand-jl.cpp index bc441168b..4b56bda67 100644 --- a/dbcon/joblist/columncommand-jl.cpp +++ b/dbcon/joblist/columncommand-jl.cpp @@ -43,7 +43,9 @@ using namespace messageqcpp; namespace joblist { -ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector lastLBID) +ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector lastLBID, + bool hasAuxCol_, const std::vector& extentsAux_) : + extentsAux(extentsAux_), hasAuxCol(hasAuxCol_) { BRM::DBRM dbrm; isScan = true; @@ -88,6 +90,7 @@ ColumnCommandJL::ColumnCommandJL(const pColStep& step) BRM::DBRM dbrm; isScan = false; + hasAuxCol = false; /* grab necessary vars from step */ traceFlags = step.fTraceFlags; @@ -210,6 +213,10 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const bs << BOP; bs << filterCount; } + if (hasAuxCol) + bs << (uint8_t)1; + else + bs << (uint8_t)0; serializeInlineVector(bs, fLastLbid); CommandJL::createCommand(bs); @@ -218,6 +225,9 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const void ColumnCommandJL::runCommand(ByteStream& bs) const { bs << lbid; + + if (hasAuxCol) + bs << lbidAux; } void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot) @@ -247,11 +257,26 @@ void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot) "; blockNum = " << blockNum << "; OID=" << OID << " LBID=" << lbid; cout << os.str() << endl; */ - return; + break; } } - throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid."); + uint32_t j; + + for (j = 0; j < extentsAux.size(); j++) + { + if (extentsAux[j].dbRoot == dbRoot && extentsAux[j].partitionNum == partNum && + extentsAux[j].segmentNum == segNum && extentsAux[j].blockOffset == (extentNum * 1 * 1024)) + { + lbidAux = extentsAux[j].range.start + (blockNum * 1); + break; + } + } + + if (i == extents.size() || (hasAuxCol && j == extentsAux.size())) + { + throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid."); + } // ostringstream os; // os << "CCJL: rid=" << rid << "; dbroot=" << dbRoot << "; partitionNum=" << partitionNum << "; diff --git a/dbcon/joblist/columncommand-jl.h b/dbcon/joblist/columncommand-jl.h index 7f700766e..f72fff1de 100644 --- a/dbcon/joblist/columncommand-jl.h +++ b/dbcon/joblist/columncommand-jl.h @@ -40,7 +40,8 @@ namespace joblist class ColumnCommandJL : public CommandJL { public: - ColumnCommandJL(const pColScanStep&, std::vector lastLBID); + ColumnCommandJL(const pColScanStep&, std::vector lastLBID, + bool hasAuxCol_, const std::vector& extentsAux_); ColumnCommandJL(const pColStep&); ColumnCommandJL(const ColumnCommandJL&, const DictStepJL&); virtual ~ColumnCommandJL(); @@ -123,6 +124,10 @@ class ColumnCommandJL : public CommandJL uint32_t numDBRoots; uint32_t dbroot; + std::vector extentsAux; + bool hasAuxCol; + uint64_t lbidAux; + static const unsigned DEFAULT_FILES_PER_COLUMN_PARTITION = 32; public: diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 36f263dfc..01c6fa29c 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -2313,7 +2313,6 @@ SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* r ret->errorInfo(errorInfo); } - std::cout<toString()< extentsAux; uint64_t fLastTupleId; BRM::LBIDRange_v lbidRanges; std::vector lastExtent; diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index d73cd299a..c2384df79 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -501,6 +501,7 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) fRunExecuted = false; fSwallowRows = false; smallOuterJoiner = -1; + hasAuxCol = false; // @1098 initialize scanFlags to be true scanFlags.assign(numExtents, true); @@ -528,6 +529,25 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi fTableOid = rhs.tableOid(); extentSize = rhs.extentSize; lbidRanges = rhs.lbidRanges; + hasAuxCol = false; + + // TODO MCOL-5021 Add try-catch block + if (fTableOid >= 3000) + { + execplan::CalpontSystemCatalog::TableName tableName = jobInfo.csc->tableName(fTableOid); + fOidAux = jobInfo.csc->tableAUXColumnOID(tableName); + + if (fOidAux > 3000) + { + hasAuxCol = true; + + if (dbrm.getExtents(fOidAux, extentsAux)) + throw runtime_error("TupleBPS::TupleBPS BRM extent lookup failure (1)"); + + idbassert(!extentsAux.empty()); + sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter()); + } + } /* These lines are obsoleted by initExtentMarkers. Need to remove & retest. */ scannedExtents = rhs.extents; @@ -650,6 +670,7 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) : BatchPrimi fRunExecuted = false; isFilterFeeder = false; smallOuterJoiner = -1; + hasAuxCol = false; // @1098 initialize scanFlags to be true scanFlags.assign(numExtents, true); @@ -719,6 +740,7 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) scanFlags.assign(numExtents, true); runtimeCPFlags.assign(numExtents, true); bop = BOP_AND; + hasAuxCol = false; runRan = joinRan = false; fDelivery = false; @@ -827,7 +849,7 @@ void TupleBPS::setBPP(JobStep* jobStep) if (pcss != 0) { - fBPP->addFilterStep(*pcss, lastScannedLBID); + fBPP->addFilterStep(*pcss, lastScannedLBID, hasAuxCol, extentsAux); extentsMap[pcss->fOid] = tr1::unordered_map(); tr1::unordered_map& ref = extentsMap[pcss->fOid]; @@ -1257,6 +1279,7 @@ void TupleBPS::initExtentMarkers() } } +// TODO MCOL-5021 Add support here void TupleBPS::reloadExtentLists() { /* diff --git a/primitives/linux-port/column.cpp b/primitives/linux-port/column.cpp index 4c060c7f6..0bc141b21 100644 --- a/primitives/linux-port/column.cpp +++ b/primitives/linux-port/column.cpp @@ -57,6 +57,85 @@ namespace { using MT = uint16_t; +const MT nonEmptyMask2Byte[256] = +{ + 0x0000, 0x0003, 0x000C, 0x000F, 0x0030, 0x0033, 0x003C, 0x003F, + 0x00C0, 0x00C3, 0x00CC, 0x00CF, 0x00F0, 0x00F3, 0x00FC, 0x00FF, + 0x0300, 0x0303, 0x030C, 0x030F, 0x0330, 0x0333, 0x033C, 0x033F, + 0x03C0, 0x03C3, 0x03CC, 0x03CF, 0x03F0, 0x03F3, 0x03FC, 0x03FF, + 0x0C00, 0x0C03, 0x0C0C, 0x0C0F, 0x0C30, 0x0C33, 0x0C3C, 0x0C3F, + 0x0CC0, 0x0CC3, 0x0CCC, 0x0CCF, 0x0CF0, 0x0CF3, 0x0CFC, 0x0CFF, + 0x0F00, 0x0F03, 0x0F0C, 0x0F0F, 0x0F30, 0x0F33, 0x0F3C, 0x0F3F, + 0x0FC0, 0x0FC3, 0x0FCC, 0x0FCF, 0x0FF0, 0x0FF3, 0x0FFC, 0x0FFF, + 0x3000, 0x3003, 0x300C, 0x300F, 0x3030, 0x3033, 0x303C, 0x303F, + 0x30C0, 0x30C3, 0x30CC, 0x30CF, 0x30F0, 0x30F3, 0x30FC, 0x30FF, + 0x3300, 0x3303, 0x330C, 0x330F, 0x3330, 0x3333, 0x333C, 0x333F, + 0x33C0, 0x33C3, 0x33CC, 0x33CF, 0x33F0, 0x33F3, 0x33FC, 0x33FF, + 0x3C00, 0x3C03, 0x3C0C, 0x3C0F, 0x3C30, 0x3C33, 0x3C3C, 0x3C3F, + 0x3CC0, 0x3CC3, 0x3CCC, 0x3CCF, 0x3CF0, 0x3CF3, 0x3CFC, 0x3CFF, + 0x3F00, 0x3F03, 0x3F0C, 0x3F0F, 0x3F30, 0x3F33, 0x3F3C, 0x3F3F, + 0x3FC0, 0x3FC3, 0x3FCC, 0x3FCF, 0x3FF0, 0x3FF3, 0x3FFC, 0x3FFF, + 0xC000, 0xC003, 0xC00C, 0xC00F, 0xC030, 0xC033, 0xC03C, 0xC03F, + 0xC0C0, 0xC0C3, 0xC0CC, 0xC0CF, 0xC0F0, 0xC0F3, 0xC0FC, 0xC0FF, + 0xC300, 0xC303, 0xC30C, 0xC30F, 0xC330, 0xC333, 0xC33C, 0xC33F, + 0xC3C0, 0xC3C3, 0xC3CC, 0xC3CF, 0xC3F0, 0xC3F3, 0xC3FC, 0xC3FF, + 0xCC00, 0xCC03, 0xCC0C, 0xCC0F, 0xCC30, 0xCC33, 0xCC3C, 0xCC3F, + 0xCCC0, 0xCCC3, 0xCCCC, 0xCCCF, 0xCCF0, 0xCCF3, 0xCCFC, 0xCCFF, + 0xCF00, 0xCF03, 0xCF0C, 0xCF0F, 0xCF30, 0xCF33, 0xCF3C, 0xCF3F, + 0xCFC0, 0xCFC3, 0xCFCC, 0xCFCF, 0xCFF0, 0xCFF3, 0xCFFC, 0xCFFF, + 0xF000, 0xF003, 0xF00C, 0xF00F, 0xF030, 0xF033, 0xF03C, 0xF03F, + 0xF0C0, 0xF0C3, 0xF0CC, 0xF0CF, 0xF0F0, 0xF0F3, 0xF0FC, 0xF0FF, + 0xF300, 0xF303, 0xF30C, 0xF30F, 0xF330, 0xF333, 0xF33C, 0xF33F, + 0xF3C0, 0xF3C3, 0xF3CC, 0xF3CF, 0xF3F0, 0xF3F3, 0xF3FC, 0xF3FF, + 0xFC00, 0xFC03, 0xFC0C, 0xFC0F, 0xFC30, 0xFC33, 0xFC3C, 0xFC3F, + 0xFCC0, 0xFCC3, 0xFCCC, 0xFCCF, 0xFCF0, 0xFCF3, 0xFCFC, 0xFCFF, + 0xFF00, 0xFF03, 0xFF0C, 0xFF0F, 0xFF30, 0xFF33, 0xFF3C, 0xFF3F, + 0xFFC0, 0xFFC3, 0xFFCC, 0xFFCF, 0xFFF0, 0xFFF3, 0xFFFC, 0xFFFF +}; + +const MT nonEmptyMask4Byte[16] = +{ + 0x0000, 0x000F, 0x00F0, 0x00FF, + 0x0F00, 0x0F0F, 0x0FF0, 0x0FFF, + 0xF000, 0xF00F, 0xF0F0, 0xF0FF, + 0xFF00, 0xFF0F, 0xFFF0, 0xFFFF +}; + +const MT nonEmptyMask8Byte[4] = +{ + 0x0000, 0x00FF, 0xFF00, 0xFFFF +}; + +const MT nonEmptyMask16Byte[2] = +{ + 0x0000, 0xFFFF +}; + +inline MT getNonEmptyMask1Byte(MT* nonEmptyMaskAux, uint16_t iter) +{ + return nonEmptyMaskAux[iter]; +} + +inline MT getNonEmptyMask2Byte(MT* nonEmptyMaskAux, uint16_t iter) +{ + return nonEmptyMask2Byte[(nonEmptyMaskAux[iter >> 1] >> ((iter & 0x0001) << 3)) & 0x00FF]; +} + +inline MT getNonEmptyMask4Byte(MT* nonEmptyMaskAux, uint16_t iter) +{ + return nonEmptyMask4Byte[(nonEmptyMaskAux[iter >> 2] >> ((iter & 0x0003) << 2)) & 0x000F]; +} + +inline MT getNonEmptyMask8Byte(MT* nonEmptyMaskAux, uint16_t iter) +{ + return nonEmptyMask8Byte[(nonEmptyMaskAux[iter >> 3] >> ((iter & 0x0007) << 1)) & 0x0003]; +} + +inline MT getNonEmptyMask16Byte(MT* nonEmptyMaskAux, uint16_t iter) +{ + return nonEmptyMask16Byte[(nonEmptyMaskAux[iter >> 4] >> (iter & 0x000F)) & 0x0001]; +} + inline uint64_t order_swap(uint64_t x) { uint64_t ret = (x >> 56) | ((x << 40) & 0x00FF000000000000ULL) | ((x << 24) & 0x0000FF0000000000ULL) | @@ -842,7 +921,9 @@ inline bool nextColValue( const uint16_t* ridArray, // Optional array of indexes into srcArray, that defines the read order const uint16_t ridSize, // ... and its size const uint8_t OutputType, // Used to decide whether to skip EMPTY values - T EMPTY_VALUE) + T EMPTY_VALUE, + const uint8_t* blockAux, + uint8_t EMPTY_VALUE_AUX) { auto i = *index; // local copy of *index to speed up loops T value; // value to be written into *result, local for the same reason @@ -897,6 +978,75 @@ inline bool nextColValue( return true; } +template +inline bool nextColValueAux( + T& result, // Place for the value returned + bool* isEmpty, // ... and flag whether it's EMPTY + uint32_t* + index, // Successive index either in srcArray (going from 0 to srcSize-1) or ridArray (0..ridSize-1) + uint16_t* rid, // Index in srcArray of the value returned + const T* srcArray, // Input array + const uint32_t srcSize, // ... and its size + const uint16_t* ridArray, // Optional array of indexes into srcArray, that defines the read order + const uint16_t ridSize, // ... and its size + const uint8_t OutputType, // Used to decide whether to skip EMPTY values + T EMPTY_VALUE, + const uint8_t* blockAux, + uint8_t EMPTY_VALUE_AUX) +{ + auto i = *index; // local copy of *index to speed up loops + uint8_t valueAux; + + if (ridArray) + { + // Read next non-empty value in the order defined by ridArray + for (;; i++) + { + if (UNLIKELY(i >= ridSize)) + return false; + + valueAux = blockAux[ridArray[i]]; + + if (valueAux != EMPTY_VALUE_AUX) + break; + } + + *rid = ridArray[i]; + *isEmpty = false; + } + else if (OutputType & OT_RID) // TODO: check correctness of this condition for SKIP_EMPTY_VALUES + { + // Read next non-empty value in the natural order + for (;; i++) + { + if (UNLIKELY(i >= srcSize)) + return false; + + valueAux = blockAux[i]; + + if (valueAux != EMPTY_VALUE_AUX) + break; + } + + *rid = i; + *isEmpty = false; + } + else + { + // Read next value in the natural order + if (UNLIKELY(i >= srcSize)) + return false; + + *rid = i; + valueAux = blockAux[i]; + *isEmpty = (valueAux == EMPTY_VALUE_AUX); + } + + *index = i + 1; + result = srcArray[*rid]; + return true; +} + /// /// WRITE COLUMN VALUES /// @@ -1179,7 +1329,9 @@ void scalarFiltering( const bool validMinMax, // The flag to store min/max T emptyValue, // Deduced empty value magic T nullValue, // Deduced null value magic - T Min, T Max, const bool isNullValueMatches) + T Min, T Max, const bool isNullValueMatches, + const bool hasAuxCol, const uint8_t* blockAux, + uint8_t emptyValueAux) { constexpr int WIDTH = sizeof(T); // Loop-local variables @@ -1187,9 +1339,12 @@ void scalarFiltering( primitives::RIDType rid = 0; bool isEmpty = false; + auto nextColValuePtr = hasAuxCol ? nextColValueAux : nextColValue; + // Loop over the column values, storing those matching the filter, and updating the min..max range - for (uint32_t i = initialRID; nextColValue(curValue, &isEmpty, &i, &rid, srcArray, srcSize, - ridArray, ridSize, outputType, emptyValue);) + for (uint32_t i = initialRID; (*nextColValuePtr)(curValue, &isEmpty, &i, &rid, srcArray, srcSize, + ridArray, ridSize, outputType, emptyValue, + blockAux, emptyValueAux);) { if (isEmpty) continue; @@ -1348,7 +1503,8 @@ template(typeHolder, simdProcessor, simdMin).v; weightsMax = simdSwapedOrderDataLoad(typeHolder, simdProcessor, simdMax).v; } - // main loop - // writeMask tells which values must get into the result. Includes values that matches filters. Can have - // NULLs. nonEmptyMask tells which vector coords are not EMPTY magics. nonNullMask tells which vector coords - // are not NULL magics. - for (uint16_t i = 0; i < iterNumber; ++i) + + if (hasAuxCol) { - primitives::RIDType ridOffset = i * VECTOR_SIZE; - assert(!HAS_INPUT_RIDS || (HAS_INPUT_RIDS && ridSize >= ridOffset)); - dataVec = simdDataLoad(simdProcessor, srcArray, - origSrcArray, ridArray, i).v; - if constexpr(KIND==KIND_TEXT) - swapedOrderDataVec = simdSwapedOrderDataLoad(typeHolder, simdProcessor, dataVec).v; - nonEmptyMask = simdProcessor.nullEmptyCmpNe(dataVec, emptyFilterArgVec); - writeMask = nonEmptyMask; - // NULL check - nonNullMask = simdProcessor.nullEmptyCmpNe(dataVec, nullFilterArgVec); - // Exclude NULLs from the resulting set if NULL doesn't match the filters. - writeMask = isNullValueMatches ? writeMask : writeMask & nonNullMask; - nonNullOrEmptyMask = nonNullMask & nonEmptyMask; - // filters - MT prevFilterMask = initFilterMask; - // TODO name this mask literal - MT filterMask = 0xFFFF; - for (uint32_t j = 0; j < filterCount; ++j) + using SimdTypeTemp = typename simd::IntegralToSIMD::type; + using FilterTypeTemp = typename simd::StorageToFiltering::type; + using VTAux = typename simd::SimdFilterProcessor; + using SimdTypeAux = typename VTAux::SimdType; + using SimdWrapperTypeAux = typename VTAux::SimdWrapperType; + VTAux simdProcessorAux; + SimdTypeAux dataVecAux; + SimdTypeAux emptyFilterArgVecAux = simdProcessorAux.emptyNullLoadValue(emptyValueAux); + const uint8_t* origBlockAux = blockAux; + constexpr uint16_t VECTOR_SIZE_AUX = VT::vecByteSize; + uint16_t iterNumberAux = HAS_INPUT_RIDS ? ridSize / VECTOR_SIZE_AUX : srcSize / VECTOR_SIZE_AUX; + MT* nonEmptyMaskAux = (MT*) alloca(sizeof(MT) * iterNumberAux); + primitives::RIDType* origRidArray = ridArray; + + for (uint16_t i = 0; i < iterNumberAux; ++i) { - // filter using compiled filter and preloaded filter argument - if constexpr(KIND==KIND_TEXT) - filterMask = copFunctorVec[j](simdProcessor, swapedOrderDataVec, filterArgsVectors[j]); - else - filterMask = copFunctorVec[j](simdProcessor, dataVec, filterArgsVectors[j]); - - filterMask = bopFunctor(prevFilterMask, filterMask); - prevFilterMask = filterMask; + dataVecAux = simdDataLoadTemplate(simdProcessorAux, blockAux, + origBlockAux, ridArray, i) + .v; + nonEmptyMaskAux[i] = simdProcessorAux.nullEmptyCmpNe(dataVecAux, emptyFilterArgVecAux); + blockAux += VECTOR_SIZE_AUX; + ridArray += VECTOR_SIZE_AUX; } - writeMask = writeMask & filterMask; - T* dataVecTPtr = reinterpret_cast(&dataVec); + ridArray = origRidArray; - // vectWriteColValues iterates over the values in the source vec - // to store values/RIDs into dstArray/ridDstArray. - // It also sets Min/Max values for the block if eligible. - // !!! vectWriteColValues increases ridDstArray internally but it doesn't go - // outside the scope of the memory allocated to out msg. - // vectWriteColValues is empty if outputMode == OT_RID. - uint16_t valuesWritten = vectWriteColValues( - simdProcessor, writeMask, nonNullOrEmptyMask, validMinMax, ridOffset, dataVecTPtr, dstArray, min, max, - in, out, ridDstArray, ridArray); - // Some outputType modes saves RIDs also. vectWriteRIDValues is empty for - // OT_DATAVALUE, OT_BOTH(vectWriteColValues takes care about RIDs). - valuesWritten = vectWriteRIDValues( - simdProcessor, valuesWritten, validMinMax, ridOffset, dataVecTPtr, ridDstArray, writeMask, min, max, - in, out, nonNullOrEmptyMask, ridArray); + MT (*getNonEmptyMaskPtr)(MT*, uint16_t); - if constexpr (KIND != KIND_TEXT) - vectorizedUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax); - else - vectorizedTextUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax, - swapedOrderDataVec, weightsMin, weightsMax); + switch(WIDTH) + { + case 1: + getNonEmptyMaskPtr = getNonEmptyMask1Byte; + break; + case 2: + getNonEmptyMaskPtr = getNonEmptyMask2Byte; + break; + case 4: + getNonEmptyMaskPtr = getNonEmptyMask4Byte; + break; + case 8: + getNonEmptyMaskPtr = getNonEmptyMask8Byte; + break; + case 16: + getNonEmptyMaskPtr = getNonEmptyMask16Byte; + break; + } - // Calculate bytes written - uint16_t bytesWritten = valuesWritten * WIDTH; - totalValuesWritten += valuesWritten; - ridDstArray += valuesWritten; - dstArray += bytesWritten; - rid += VECTOR_SIZE; - srcArray += VECTOR_SIZE; - ridArray += VECTOR_SIZE; + // main loop + // writeMask tells which values must get into the result. Includes values that matches filters. Can have + // NULLs. nonEmptyMask tells which vector coords are not EMPTY magics. nonNullMask tells which vector coords + // are not NULL magics. + for (uint16_t i = 0; i < iterNumber; ++i) + { + primitives::RIDType ridOffset = i * VECTOR_SIZE; + assert(!HAS_INPUT_RIDS || (HAS_INPUT_RIDS && ridSize >= ridOffset)); + dataVec = simdDataLoad(simdProcessor, srcArray, + origSrcArray, ridArray, i).v; + if constexpr(KIND==KIND_TEXT) + swapedOrderDataVec = simdSwapedOrderDataLoad(typeHolder, simdProcessor, dataVec).v; + nonEmptyMask = (*getNonEmptyMaskPtr)(nonEmptyMaskAux, i); + writeMask = nonEmptyMask; + // NULL check + nonNullMask = simdProcessor.nullEmptyCmpNe(dataVec, nullFilterArgVec); + // Exclude NULLs from the resulting set if NULL doesn't match the filters. + writeMask = isNullValueMatches ? writeMask : writeMask & nonNullMask; + nonNullOrEmptyMask = nonNullMask & nonEmptyMask; + // filters + MT prevFilterMask = initFilterMask; + // TODO name this mask literal + MT filterMask = 0xFFFF; + for (uint32_t j = 0; j < filterCount; ++j) + { + // filter using compiled filter and preloaded filter argument + if constexpr(KIND==KIND_TEXT) + filterMask = copFunctorVec[j](simdProcessor, swapedOrderDataVec, filterArgsVectors[j]); + else + filterMask = copFunctorVec[j](simdProcessor, dataVec, filterArgsVectors[j]); + + filterMask = bopFunctor(prevFilterMask, filterMask); + prevFilterMask = filterMask; + } + writeMask = writeMask & filterMask; + + T* dataVecTPtr = reinterpret_cast(&dataVec); + + // vectWriteColValues iterates over the values in the source vec + // to store values/RIDs into dstArray/ridDstArray. + // It also sets min/max values for the block if eligible. + // !!! vectWriteColValues increases ridDstArray internally but it doesn't go + // outside the scope of the memory allocated to out msg. + // vectWriteColValues is empty if outputMode == OT_RID. + uint16_t valuesWritten = vectWriteColValues( + simdProcessor, writeMask, nonNullOrEmptyMask, validMinMax, ridOffset, dataVecTPtr, dstArray, min, max, + in, out, ridDstArray, ridArray); + // Some outputType modes saves RIDs also. vectWriteRIDValues is empty for + // OT_DATAVALUE, OT_BOTH(vectWriteColValues takes care about RIDs). + valuesWritten = vectWriteRIDValues( + simdProcessor, valuesWritten, validMinMax, ridOffset, dataVecTPtr, ridDstArray, writeMask, min, max, + in, out, nonNullOrEmptyMask, ridArray); + + if constexpr (KIND != KIND_TEXT) + vectorizedUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax); + else + vectorizedTextUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax, + swapedOrderDataVec, weightsMin, weightsMax); + + // Calculate bytes written + uint16_t bytesWritten = valuesWritten * WIDTH; + totalValuesWritten += valuesWritten; + ridDstArray += valuesWritten; + dstArray += bytesWritten; + rid += VECTOR_SIZE; + srcArray += VECTOR_SIZE; + ridArray += VECTOR_SIZE; + } } + else + { + // main loop + // writeMask tells which values must get into the result. Includes values that matches filters. Can have + // NULLs. nonEmptyMask tells which vector coords are not EMPTY magics. nonNullMask tells which vector coords + // are not NULL magics. + for (uint16_t i = 0; i < iterNumber; ++i) + { + primitives::RIDType ridOffset = i * VECTOR_SIZE; + assert(!HAS_INPUT_RIDS || (HAS_INPUT_RIDS && ridSize >= ridOffset)); + dataVec = simdDataLoad(simdProcessor, srcArray, + origSrcArray, ridArray, i).v; + if constexpr(KIND==KIND_TEXT) + swapedOrderDataVec = simdSwapedOrderDataLoad(typeHolder, simdProcessor, dataVec).v; + nonEmptyMask = simdProcessor.nullEmptyCmpNe(dataVec, emptyFilterArgVec); + writeMask = nonEmptyMask; + // NULL check + nonNullMask = simdProcessor.nullEmptyCmpNe(dataVec, nullFilterArgVec); + // Exclude NULLs from the resulting set if NULL doesn't match the filters. + writeMask = isNullValueMatches ? writeMask : writeMask & nonNullMask; + nonNullOrEmptyMask = nonNullMask & nonEmptyMask; + // filters + MT prevFilterMask = initFilterMask; + // TODO name this mask literal + MT filterMask = 0xFFFF; + for (uint32_t j = 0; j < filterCount; ++j) + { + // filter using compiled filter and preloaded filter argument + if constexpr(KIND==KIND_TEXT) + filterMask = copFunctorVec[j](simdProcessor, swapedOrderDataVec, filterArgsVectors[j]); + else + filterMask = copFunctorVec[j](simdProcessor, dataVec, filterArgsVectors[j]); + + filterMask = bopFunctor(prevFilterMask, filterMask); + prevFilterMask = filterMask; + } + writeMask = writeMask & filterMask; + + T* dataVecTPtr = reinterpret_cast(&dataVec); + + // vectWriteColValues iterates over the values in the source vec + // to store values/RIDs into dstArray/ridDstArray. + // It also sets min/max values for the block if eligible. + // !!! vectWriteColValues increases ridDstArray internally but it doesn't go + // outside the scope of the memory allocated to out msg. + // vectWriteColValues is empty if outputMode == OT_RID. + uint16_t valuesWritten = vectWriteColValues( + simdProcessor, writeMask, nonNullOrEmptyMask, validMinMax, ridOffset, dataVecTPtr, dstArray, min, max, + in, out, ridDstArray, ridArray); + // Some outputType modes saves RIDs also. vectWriteRIDValues is empty for + // OT_DATAVALUE, OT_BOTH(vectWriteColValues takes care about RIDs). + valuesWritten = vectWriteRIDValues( + simdProcessor, valuesWritten, validMinMax, ridOffset, dataVecTPtr, ridDstArray, writeMask, min, max, + in, out, nonNullOrEmptyMask, ridArray); + + if constexpr (KIND != KIND_TEXT) + vectorizedUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax); + else + vectorizedTextUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax, + swapedOrderDataVec, weightsMin, weightsMax); + + // Calculate bytes written + uint16_t bytesWritten = valuesWritten * WIDTH; + totalValuesWritten += valuesWritten; + ridDstArray += valuesWritten; + dstArray += bytesWritten; + rid += VECTOR_SIZE; + srcArray += VECTOR_SIZE; + ridArray += VECTOR_SIZE; + } + } + if constexpr (KIND != KIND_TEXT) extractMinMax(simdProcessor, simdMin, simdMax, min, max); else @@ -1567,7 +1848,7 @@ void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* scalarFiltering(in, out, columnFilterMode, filterSet, filterCount, filterCOPs, filterValues, filterRFs, in->colType, origSrcArray, srcSize, origRidArray, ridSize, processedSoFar, outputType, validMinMax, emptyValue, nullValue, - min, max, isNullValueMatches); + min, max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); } // This routine dispatches template function calls to reduce branching. @@ -1577,7 +1858,8 @@ void vectorizedFilteringDispatcher(NewColRequestHeader* in, ColResultHeader* out const uint16_t ridSize, ParsedColumnFilter* parsedColumnFilter, const bool validMinMax, const STORAGE_TYPE emptyValue, const STORAGE_TYPE nullValue, STORAGE_TYPE Min, STORAGE_TYPE Max, - const bool isNullValueMatches) + const bool isNullValueMatches, + const bool hasAuxCol, const uint8_t* blockAux, uint8_t emptyValueAux) { // Using struct to dispatch SIMD type based on integral type T. using SimdType = typename simd::IntegralToSIMD::type; @@ -1592,22 +1874,22 @@ void vectorizedFilteringDispatcher(NewColRequestHeader* in, ColResultHeader* out case OT_RID: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; case OT_BOTH: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; case OT_TOKEN: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; case OT_DATAVALUE: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; } } @@ -1619,22 +1901,22 @@ void vectorizedFilteringDispatcher(NewColRequestHeader* in, ColResultHeader* out case OT_RID: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; case OT_BOTH: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; case OT_TOKEN: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; case OT_DATAVALUE: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); break; } } @@ -1651,7 +1933,8 @@ template void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* ridArray, const uint16_t ridSize, // Number of values in ridArray int* srcArray16, const uint32_t srcSize, - boost::shared_ptr parsedColumnFilter) + boost::shared_ptr parsedColumnFilter, + bool hasAuxCol, int* blockAux) { using FT = typename IntegralTypeToFilterType::type; using ST = typename IntegralTypeToFilterSetType::type; @@ -1677,6 +1960,7 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r // Bit patterns in srcArray[i] representing EMPTY and NULL values T emptyValue = getEmptyValue(dataType); T nullValue = getNullValue(dataType); + uint8_t emptyValueAux = getEmptyValue(datatypes::SystemCatalog::UTINYINT); // Precompute filter results for NULL values bool isNullValueMatches = @@ -1703,13 +1987,18 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r bool canUseFastFiltering = true; for (uint32_t i = 0; i < filterCount; ++i) if (filterRFs[i] != 0) + { canUseFastFiltering = false; + break; + } if (canUseFastFiltering) { vectorizedFilteringDispatcher(in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter.get(), validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches); + nullValue, Min, Max, isNullValueMatches, + hasAuxCol, reinterpret_cast(blockAux), + emptyValueAux); return; } } @@ -1718,7 +2007,8 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r scalarFiltering(in, out, columnFilterMode, filterSet, filterCount, filterCOPs, filterValues, filterRFs, in->colType, srcArray, srcSize, ridArray, ridSize, initialRID, outputType, validMinMax, emptyValue, nullValue, Min, Max, - isNullValueMatches); + isNullValueMatches, hasAuxCol, reinterpret_cast(blockAux), + emptyValueAux); } // end of filterColumnData } // namespace @@ -1753,7 +2043,9 @@ template ::type* = nullptr> #endif -void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out) +void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, + ColResultHeader* out, + bool hasAuxCol) { constexpr int W = sizeof(T); auto dataType = (execplan::CalpontSystemCatalog::ColDataType)in->colType.DataType; @@ -1762,10 +2054,10 @@ void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, Co const uint16_t ridSize = in->NVALS; uint16_t* ridArray = in->getRIDArrayPtr(W); const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE : BLOCK_SIZE / W; - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); return; } - _scanAndFilterTypeDispatcher(in, out); + _scanAndFilterTypeDispatcher(in, out, hasAuxCol); } template ::type* = nullptr> #endif -void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out) +void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, + ColResultHeader* out, + bool hasAuxCol) { constexpr int W = sizeof(T); auto dataType = (execplan::CalpontSystemCatalog::ColDataType)in->colType.DataType; @@ -1787,10 +2081,10 @@ void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, Co const uint16_t ridSize = in->NVALS; uint16_t* ridArray = in->getRIDArrayPtr(W); const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE : BLOCK_SIZE / W; - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); return; } - _scanAndFilterTypeDispatcher(in, out); + _scanAndFilterTypeDispatcher(in, out, hasAuxCol); } template ::type* = nullptr> #endif -void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out) +void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, + ColResultHeader* out, + bool hasAuxCol) { - _scanAndFilterTypeDispatcher(in, out); + _scanAndFilterTypeDispatcher(in, out, hasAuxCol); } template ::type* = nullptr> #endif -void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out) +void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, + ColResultHeader* out, + bool hasAuxCol) { constexpr int W = sizeof(T); const uint16_t ridSize = in->NVALS; uint16_t* ridArray = in->getRIDArrayPtr(W); const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE : BLOCK_SIZE / W; - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); } template ::type* = nullptr> #endif -void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out) +void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, + ColResultHeader* out, + bool hasAuxCol) { constexpr int W = sizeof(T); using UT = typename std::conditional::value || datatypes::is_uint128_t::value, T, @@ -1856,22 +2156,23 @@ void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, C dataType == execplan::CalpontSystemCatalog::TEXT) && !isDictTokenScan(in)) { - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); return; } if (datatypes::isUnsigned(dataType)) { - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); return; } - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); } // The entrypoint for block scanning and filtering. // The block is in in msg, out msg is used to store values|RIDs matched. template -void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out) +void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out, + bool hasAuxCol) { #ifdef PRIM_DEBUG auto markEvent = [&](char eventChar) @@ -1910,21 +2211,26 @@ void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultH // Sort ridArray (the row index array) if there are RIDs with this in msg in->sortRIDArrayIfNeeded(W); - scanAndFilterTypeDispatcher(in, out); + scanAndFilterTypeDispatcher(in, out, hasAuxCol); #ifdef PRIM_DEBUG markEvent('C'); #endif } template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*); + ColResultHeader*, + bool); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*); + ColResultHeader*, + bool); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*); + ColResultHeader*, + bool); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*); + ColResultHeader*, + bool); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*); + ColResultHeader*, + bool); -} // namespace primitives \ No newline at end of file +} // namespace primitives diff --git a/primitives/linux-port/primitiveprocessor.h b/primitives/linux-port/primitiveprocessor.h index 98bdd9fee..16a2ba416 100644 --- a/primitives/linux-port/primitiveprocessor.h +++ b/primitives/linux-port/primitiveprocessor.h @@ -294,6 +294,10 @@ class PrimitiveProcessor { block = data; } + void setBlockPtrAux(int* data) + { + blockAux = data; + } void setPMStatsPtr(dbbc::Stats* p) { fStatsPtr = p; @@ -392,20 +396,20 @@ class PrimitiveProcessor template ::type* = nullptr> - void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); + void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); template ::type* = nullptr> - void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); + void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); template ::type* = nullptr> - void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); + void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); template ::type* = nullptr> - void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); + void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); template ::type* = nullptr> - void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); + void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); template - void columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out); + void columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); boost::shared_ptr parseColumnFilter(const uint8_t* filterString, uint32_t colWidth, uint32_t colType, uint32_t filterCount, @@ -444,6 +448,7 @@ class PrimitiveProcessor PrimitiveProcessor& operator=(const PrimitiveProcessor& rhs); int* block; + int* blockAux; bool compare(const datatypes::Charset& cs, uint8_t COP, const char* str1, size_t length1, const char* str2, size_t length2) throw(); diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 7b0b78534..659792c57 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -147,6 +147,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); + pp.setBlockPtrAux((int*)blockDataAux); pthread_mutex_init(&objLock, NULL); } @@ -206,6 +207,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); + pp.setBlockPtrAux((int*)blockDataAux); sendThread = bppst; pthread_mutex_init(&objLock, NULL); initBPP(b); @@ -600,8 +602,8 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con /* init vars not part of the BS */ currentBlockOffset = 0; - memset(relLBID.get(), 0, sizeof(uint64_t) * (projectCount + 1)); - memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 1)); + memset(relLBID.get(), 0, sizeof(uint64_t) * (projectCount + 2)); + memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2)); buildVSSCache(count); #ifdef __FreeBSD__ @@ -1156,9 +1158,11 @@ void BatchPrimitiveProcessor::initProcessor() } // @bug 1269, initialize data used by execute() for async loading blocks - // +1 for the scan filter step with no predicate, if any - relLBID.reset(new uint64_t[projectCount + 1]); - asyncLoaded.reset(new bool[projectCount + 1]); + // +2 for: + // 1. the scan filter step with no predicate, if any + // 2. AUX column + relLBID.reset(new uint64_t[projectCount + 2]); + asyncLoaded.reset(new bool[projectCount + 2]); } /* This version does a join on projected rows */ @@ -1468,6 +1472,19 @@ void BatchPrimitiveProcessor::execute() asyncLoaded[p] = true; } + if (col->hasAuxCol()) + { + asyncLoaded[p + 1] = asyncLoaded[p + 1] && (relLBID[p + 1] % blocksReadAhead != 0); + relLBID[p + 1] += 1; + + if (!asyncLoaded[p + 1]) + { + loadBlockAsync(col->getLBIDAux(), versionInfo, txnID, 2, &cachedIO, &physIO, + LBIDTrace, sessionID, &counterLock, &busyLoaderCount, sendThread, &vssCache); + asyncLoaded[p + 1] = true; + } + } + asyncLoadProjectColumns(); } } diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 292a6cf39..78eabe761 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -235,6 +235,7 @@ class BatchPrimitiveProcessor /* Common space for primitive data */ alignas(utils::MAXCOLUMNWIDTH) uint8_t blockData[BLOCK_SIZE * utils::MAXCOLUMNWIDTH]; + uint8_t blockDataAux[BLOCK_SIZE]; boost::scoped_array outputMsg; uint32_t outMsgSize; diff --git a/primitives/primproc/columncommand.cpp b/primitives/primproc/columncommand.cpp index 315d9293e..80a012626 100644 --- a/primitives/primproc/columncommand.cpp +++ b/primitives/primproc/columncommand.cpp @@ -202,6 +202,20 @@ void ColumnCommand::_loadData() bpp->cachedIO += wasCached; bpp->physIO += blocksRead; bpp->touchedBlocks += blocksToLoad; + + if (_hasAuxCol) + { + BRM::LBID_t* lbidsAux = (BRM::LBID_t*)alloca(1 * sizeof(BRM::LBID_t)); + uint8_t** blockPtrsAux = (uint8_t**)alloca(1 * sizeof(uint8_t*)); + blockPtrsAux[0] = &bpp->blockDataAux[0]; + lbidsAux[0] = lbidAux; + wasCached = primitiveprocessor::loadBlocks(lbidsAux, bpp->versionInfo, bpp->txnID, 2, + blockPtrsAux, &blocksRead, bpp->LBIDTrace, bpp->sessionID, + 1, &wasVersioned, true, &bpp->vssCache); + bpp->cachedIO += wasCached; + bpp->physIO += blocksRead; + bpp->touchedBlocks += 1; + } } void ColumnCommand::loadData() @@ -276,7 +290,7 @@ void ColumnCommand::_issuePrimitive() using IntegralType = typename datatypes::WidthToSIntegralType::type; // Down the call stack the code presumes outMsg buffer has enough space to store // ColRequestHeader + uint16_t Rids[8192] + IntegralType[8192]. - bpp->getPrimitiveProcessor().columnScanAndFilter(primMsg, outMsg); + bpp->getPrimitiveProcessor().columnScanAndFilter(primMsg, outMsg, _hasAuxCol); } // _issuePrimitive() void ColumnCommand::updateCPDataNarrow() @@ -526,6 +540,8 @@ void ColumnCommand::createCommand(ByteStream& bs) bs >> filterString; bs >> BOP; bs >> filterCount; + bs >> tmp8; + _hasAuxCol = tmp8; deserializeInlineVector(bs, lastLbid); Command::createCommand(bs); @@ -558,6 +574,8 @@ void ColumnCommand::createCommand(execplan::CalpontSystemCatalog::ColType& aColT bs >> filterString; bs >> BOP; bs >> filterCount; + bs >> tmp8; + _hasAuxCol = tmp8; deserializeInlineVector(bs, lastLbid); Command::createCommand(bs); @@ -566,6 +584,9 @@ void ColumnCommand::createCommand(execplan::CalpontSystemCatalog::ColType& aColT void ColumnCommand::resetCommand(ByteStream& bs) { bs >> lbid; + + if (_hasAuxCol) + bs >> lbidAux; } void ColumnCommand::prep(int8_t outputType, bool absRids) @@ -826,11 +847,15 @@ void ColumnCommand::projectIntoRowGroup(RowGroup& rg, uint32_t pos) void ColumnCommand::nextLBID() { lbid += colType.colWidth; + + if (_hasAuxCol) + lbidAux += 1; } void ColumnCommand::duplicate(ColumnCommand* cc) { cc->_isScan = _isScan; + cc->_hasAuxCol = _hasAuxCol; cc->traceFlags = traceFlags; cc->filterString = filterString; cc->colType.colDataType = colType.colDataType; @@ -862,6 +887,9 @@ bool ColumnCommand::operator==(const ColumnCommand& cc) const if (_isScan != cc._isScan) return false; + if (_hasAuxCol != cc._hasAuxCol) + return false; + if (BOP != cc.BOP) return false; @@ -891,6 +919,7 @@ bool ColumnCommand::operator!=(const ColumnCommand& cc) const ColumnCommand& ColumnCommand::operator=(const ColumnCommand& c) { _isScan = c._isScan; + _hasAuxCol = c._hasAuxCol; traceFlags = c.traceFlags; filterString = c.filterString; colType.colDataType = c.colType.colDataType; @@ -928,6 +957,15 @@ void ColumnCommand::getLBIDList(uint32_t loopCount, vector* lbids) for (i = firstLBID; i <= lastLBID; i++) lbids->push_back(i); + + if (_hasAuxCol) + { + firstLBID = lbidAux; + lastLBID = firstLBID + (loopCount * 1) - 1; + + for (i = firstLBID; i <= lastLBID; i++) + lbids->push_back(i); + } } int64_t ColumnCommand::getLastLbid() diff --git a/primitives/primproc/columncommand.h b/primitives/primproc/columncommand.h index 518374ff0..070655b58 100644 --- a/primitives/primproc/columncommand.h +++ b/primitives/primproc/columncommand.h @@ -76,6 +76,16 @@ class ColumnCommand : public Command { return _isScan; } + + bool hasAuxCol() const + { + return _hasAuxCol; + } + uint64_t getLBIDAux() const + { + return lbidAux; + } + void createCommand(messageqcpp::ByteStream&); void createCommand(execplan::CalpontSystemCatalog::ColType& aColType, messageqcpp::ByteStream&); void resetCommand(messageqcpp::ByteStream&); @@ -163,6 +173,8 @@ class ColumnCommand : public Command uint32_t baseMsgLength; uint64_t lbid; + bool _hasAuxCol; + uint64_t lbidAux; uint32_t traceFlags; // probably move this to Command uint8_t BOP; messageqcpp::ByteStream filterString; diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index a280c4369..ede1ce1f3 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1162,6 +1162,7 @@ int DictScanJob::operator()() loadBlock(cmd->LBID, verInfo, cmd->Hdr.TransactionID, cmd->CompType, data, &wasBlockInCache, &blocksRead, fLBIDTraceOn, session); + // TODO MCOL-5021 pproc.setBlockPtr((int*)data); pproc.p_TokenByScan(cmd, output, output_buf_size, eqFilter);