diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index 3d3bf4214..aebfb094b 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -274,9 +274,8 @@ keepGoing: cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl; #endif - uint32_t size = numColumns + numDictCols; - idbassert(size > 0); - size += 1; // MCOL-5021 + uint32_t numColumnOids = numColumns + numDictCols; + numColumnOids += 1; // MCOL-5021 if (fStartingColOID < 0) { @@ -300,7 +299,7 @@ keepGoing: bytestream << (uint32_t)createTableStmt.fSessionID; bytestream << (uint32_t)txnID.id; bytestream << (uint32_t)fStartingColOID; - bytestream << (uint32_t)(fStartingColOID + size); + bytestream << (uint32_t)(fStartingColOID + numColumnOids); bytestream << (uint32_t)createTableStmt.fTableWithAutoi; uint16_t dbRoot; BRM::OID_t sysOid = 1001; @@ -543,7 +542,7 @@ keepGoing: bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES; bytestream << uniqueId; bytestream << (uint32_t)txnID.id; - bytestream << size; + bytestream << numColumnOids; unsigned colNum = 0; unsigned dictNum = 0; @@ -607,7 +606,7 @@ keepGoing: ++iter; } - bytestream << (fStartingColOID + size); + bytestream << (fStartingColOID + numColumnOids); bytestream << (uint8_t)execplan::AUX_COL_DATATYPE; bytestream << (uint8_t) false; bytestream << (uint32_t)execplan::AUX_COL_WIDTH; @@ -630,7 +629,7 @@ keepGoing: } // MCOL-5021 - oidList.push_back(fStartingColOID + size); + oidList.push_back(fStartingColOID + numColumnOids); try { @@ -699,9 +698,9 @@ keepGoing: bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; - bytestream << (uint32_t)size; + bytestream << (uint32_t)numColumnOids; - for (unsigned i = 0; i < size; i++) + for (unsigned i = 0; i < numColumnOids; i++) { bytestream << (uint32_t)(fStartingColOID + i + 1); } diff --git a/dbcon/execplan/calpontsystemcatalog.cpp b/dbcon/execplan/calpontsystemcatalog.cpp index 54f8953fe..332a076f0 100644 --- a/dbcon/execplan/calpontsystemcatalog.cpp +++ b/dbcon/execplan/calpontsystemcatalog.cpp @@ -3662,20 +3662,23 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::tableAUXColumnOID(const TableNam CalpontSelectExecutionPlan::FilterTokenList filterTokenList; CalpontSelectExecutionPlan::ColumnMap colMap; + static const std::string sysCatSchemaTablePrefix = + CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "."; + SimpleColumn* c1 = - new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, fSessionID); + new SimpleColumn(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, fSessionID); SimpleColumn* c2 = - new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + SCHEMA_COL, fSessionID); + new SimpleColumn(sysCatSchemaTablePrefix + SCHEMA_COL, fSessionID); SimpleColumn* c3 = - new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + TABLENAME_COL, fSessionID); + new SimpleColumn(sysCatSchemaTablePrefix + TABLENAME_COL, fSessionID); SRCP srcp; srcp.reset(c1); - colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, srcp)); + colMap.insert(CMVT_(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, srcp)); srcp.reset(c2); - colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + SCHEMA_COL, srcp)); + colMap.insert(CMVT_(sysCatSchemaTablePrefix + SCHEMA_COL, srcp)); srcp.reset(c3); - colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + TABLENAME_COL, srcp)); + colMap.insert(CMVT_(sysCatSchemaTablePrefix + TABLENAME_COL, srcp)); csep.columnMapNonStatic(colMap); srcp.reset(c1->clone()); @@ -3771,15 +3774,18 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::isAUXColumnOID(const OID& oid) CalpontSelectExecutionPlan::FilterTokenList filterTokenList; CalpontSelectExecutionPlan::ColumnMap colMap; + static const std::string sysCatSchemaTablePrefix = + CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "."; + SimpleColumn* c1 = - new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + OBJECTID_COL, fSessionID); + new SimpleColumn(sysCatSchemaTablePrefix + OBJECTID_COL, fSessionID); SimpleColumn* c2 = - new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, fSessionID); + new SimpleColumn(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, fSessionID); SRCP srcp; srcp.reset(c1); - colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + OBJECTID_COL, srcp)); + colMap.insert(CMVT_(sysCatSchemaTablePrefix + OBJECTID_COL, srcp)); srcp.reset(c2); - colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, srcp)); + colMap.insert(CMVT_(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, srcp)); csep.columnMapNonStatic(colMap); srcp.reset(c1->clone()); diff --git a/dbcon/joblist/columncommand-jl.cpp b/dbcon/joblist/columncommand-jl.cpp index 2418554b6..48f82aec3 100644 --- a/dbcon/joblist/columncommand-jl.cpp +++ b/dbcon/joblist/columncommand-jl.cpp @@ -265,6 +265,11 @@ void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot) } } + if (i == extents.size()) + { + throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid."); + } + uint32_t j; for (j = 0; j < extentsAux.size(); j++) @@ -277,7 +282,7 @@ void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot) } } - if (i == extents.size() || (hasAuxCol && j == extentsAux.size())) + if (hasAuxCol && j == extentsAux.size()) { throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid."); } diff --git a/dbcon/joblist/primitivemsg.h b/dbcon/joblist/primitivemsg.h index fe0c52ec2..93413c1ce 100644 --- a/dbcon/joblist/primitivemsg.h +++ b/dbcon/joblist/primitivemsg.h @@ -717,6 +717,7 @@ struct NewColRequestHeader uint16_t NOPS; uint16_t NVALS; uint8_t sort; // 1 to sort + bool hasAuxCol; // this follows the header // ColArgs ArgList[NOPS] (where the val field is DataSize bytes long) // uint16_t Rids[NVALS] (each rid is relative to the given block) diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 6db872004..a0035f677 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -579,7 +579,6 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter()); - extentsMap[fOidAux] = tr1::unordered_map(); tr1::unordered_map& refAux = extentsMap[fOidAux]; for (uint32_t z = 0; z < extentsAux.size(); z++) @@ -589,7 +588,6 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi /* These lines are obsoleted by initExtentMarkers. Need to remove & retest. */ scannedExtents = rhs.extents; - extentsMap[fOid] = tr1::unordered_map(); tr1::unordered_map& ref = extentsMap[fOid]; for (uint32_t z = 0; z < rhs.extents.size(); z++) diff --git a/oam/etc/Columnstore.xml b/oam/etc/Columnstore.xml index d59795def..7774ccc0d 100644 --- a/oam/etc/Columnstore.xml +++ b/oam/etc/Columnstore.xml @@ -137,7 +137,7 @@ /var/lib/columnstore/data1/systemFiles/bulkRollback 98 1 - false + n 1 diff --git a/primitives/linux-port/column.cpp b/primitives/linux-port/column.cpp index 72d36a0bf..211a77f72 100644 --- a/primitives/linux-port/column.cpp +++ b/primitives/linux-port/column.cpp @@ -136,6 +136,44 @@ inline MT getNonEmptyMask16Byte(MT* nonEmptyMaskAux, uint16_t iter) return nonEmptyMask16Byte[(nonEmptyMaskAux[iter >> 4] >> (iter & 0x000F)) & 0x0001]; } +typedef MT (*getNonEmptyMaskPtrT)(MT*, uint16_t); + +template +constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtrTemplate() +{ + return nullptr; +} + +template<> +constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtrTemplate<1>() +{ + return getNonEmptyMask1Byte; +} + +template<> +constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtrTemplate<2>() +{ + return getNonEmptyMask2Byte; +} + +template<> +constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtrTemplate<4>() +{ + return getNonEmptyMask4Byte; +} + +template<> +constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtrTemplate<8>() +{ + return getNonEmptyMask8Byte; +} + +template<> +constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtrTemplate<16>() +{ + return getNonEmptyMask16Byte; +} + inline uint64_t order_swap(uint64_t x) { uint64_t ret = (x >> 56) | ((x << 40) & 0x00FF000000000000ULL) | ((x << 24) & 0x0000FF0000000000ULL) | @@ -909,24 +947,22 @@ T getInitialMax(NewColRequestHeader* in) // Return true on success, false on End of Block. // Values are read from srcArray either in natural order or in the order defined by ridArray. // Empty values are skipped, unless ridArray==0 && !(OutputType & OT_RID). -template +template inline bool nextColValue( T& result, // Place for the value returned bool* isEmpty, // ... and flag whether it's EMPTY - uint32_t* - index, // Successive index either in srcArray (going from 0 to srcSize-1) or ridArray (0..ridSize-1) + uint32_t* index, // Successive index either in srcArray (going from 0 to srcSize-1) or ridArray (0..ridSize-1) uint16_t* rid, // Index in srcArray of the value returned const T* srcArray, // Input array const uint32_t srcSize, // ... and its size const uint16_t* ridArray, // Optional array of indexes into srcArray, that defines the read order const uint16_t ridSize, // ... and its size const uint8_t OutputType, // Used to decide whether to skip EMPTY values - T EMPTY_VALUE, + const T& EMPTY_VALUE, const uint8_t* blockAux, uint8_t EMPTY_VALUE_AUX) { auto i = *index; // local copy of *index to speed up loops - T value; // value to be written into *result, local for the same reason if (ridArray) { @@ -936,10 +972,20 @@ inline bool nextColValue( if (UNLIKELY(i >= ridSize)) return false; - value = srcArray[ridArray[i]]; + if constexpr (IS_AUX_COLUMN) + { + uint8_t valueAux = blockAux[ridArray[i]]; - if (value != EMPTY_VALUE) - break; + if (valueAux != EMPTY_VALUE_AUX) + break; + } + else + { + T value = srcArray[ridArray[i]]; + + if (value != EMPTY_VALUE) + break; + } } *rid = ridArray[i]; @@ -953,10 +999,20 @@ inline bool nextColValue( if (UNLIKELY(i >= srcSize)) return false; - value = srcArray[i]; + if constexpr (IS_AUX_COLUMN) + { + uint8_t valueAux = blockAux[i]; - if (value != EMPTY_VALUE) - break; + if (valueAux != EMPTY_VALUE_AUX) + break; + } + else + { + T value = srcArray[i]; + + if (value != EMPTY_VALUE) + break; + } } *rid = i; @@ -969,77 +1025,17 @@ inline bool nextColValue( return false; *rid = i; - value = srcArray[i]; - *isEmpty = (value == EMPTY_VALUE); - } - *index = i + 1; - result = value; - return true; -} - -template -inline bool nextColValueAux( - T& result, // Place for the value returned - bool* isEmpty, // ... and flag whether it's EMPTY - uint32_t* - index, // Successive index either in srcArray (going from 0 to srcSize-1) or ridArray (0..ridSize-1) - uint16_t* rid, // Index in srcArray of the value returned - const T* srcArray, // Input array - const uint32_t srcSize, // ... and its size - const uint16_t* ridArray, // Optional array of indexes into srcArray, that defines the read order - const uint16_t ridSize, // ... and its size - const uint8_t OutputType, // Used to decide whether to skip EMPTY values - T EMPTY_VALUE, - const uint8_t* blockAux, - uint8_t EMPTY_VALUE_AUX) -{ - auto i = *index; // local copy of *index to speed up loops - uint8_t valueAux; - - if (ridArray) - { - // Read next non-empty value in the order defined by ridArray - for (;; i++) + if constexpr (IS_AUX_COLUMN) { - if (UNLIKELY(i >= ridSize)) - return false; - - valueAux = blockAux[ridArray[i]]; - - if (valueAux != EMPTY_VALUE_AUX) - break; + uint8_t valueAux = blockAux[i]; + *isEmpty = (valueAux == EMPTY_VALUE_AUX); } - - *rid = ridArray[i]; - *isEmpty = false; - } - else if (OutputType & OT_RID) // TODO: check correctness of this condition for SKIP_EMPTY_VALUES - { - // Read next non-empty value in the natural order - for (;; i++) + else { - if (UNLIKELY(i >= srcSize)) - return false; - - valueAux = blockAux[i]; - - if (valueAux != EMPTY_VALUE_AUX) - break; + T value = srcArray[i]; + *isEmpty = (value == EMPTY_VALUE); } - - *rid = i; - *isEmpty = false; - } - else - { - // Read next value in the natural order - if (UNLIKELY(i >= srcSize)) - return false; - - *rid = i; - valueAux = blockAux[i]; - *isEmpty = (valueAux == EMPTY_VALUE_AUX); } *index = i + 1; @@ -1330,7 +1326,7 @@ void scalarFiltering( T emptyValue, // Deduced empty value magic T nullValue, // Deduced null value magic T Min, T Max, const bool isNullValueMatches, - const bool hasAuxCol, const uint8_t* blockAux, + const uint8_t* blockAux, uint8_t emptyValueAux) { constexpr int WIDTH = sizeof(T); @@ -1339,7 +1335,7 @@ void scalarFiltering( primitives::RIDType rid = 0; bool isEmpty = false; - auto nextColValuePtr = hasAuxCol ? nextColValueAux : nextColValue; + auto nextColValuePtr = in->hasAuxCol ? nextColValue : nextColValue; // Loop over the column values, storing those matching the filter, and updating the min..max range for (uint32_t i = initialRID; (*nextColValuePtr)(curValue, &isEmpty, &i, &rid, srcArray, srcSize, @@ -1489,6 +1485,34 @@ void extractTextMinMax(VT& simdProcessor, SimdType simdMin, SimdType simdMax, Si max = simdMaxVec[indMax - weightsMaxVec]; } +template +void buildAuxColEmptyVal(const uint16_t iterNumberAux, const uint16_t vectorSizeAux, + const uint8_t** blockAux, uint8_t emptyValueAux, + MT** nonEmptyMaskAux, primitives::RIDType** ridArray) +{ + using SimdTypeTemp = typename simd::IntegralToSIMD::type; + using FilterTypeTemp = typename simd::StorageToFiltering::type; + using VTAux = typename simd::SimdFilterProcessor; + using SimdTypeAux = typename VTAux::SimdType; + using SimdWrapperTypeAux = typename VTAux::SimdWrapperType; + VTAux simdProcessorAux; + SimdTypeAux dataVecAux; + SimdTypeAux emptyFilterArgVecAux = simdProcessorAux.emptyNullLoadValue(emptyValueAux); + const uint8_t* origBlockAux = *blockAux; + primitives::RIDType* origRidArray = *ridArray; + + for (uint16_t i = 0; i < iterNumberAux; ++i) + { + dataVecAux = simdDataLoad(simdProcessorAux, *blockAux, + origBlockAux, *ridArray, i).v; + (*nonEmptyMaskAux)[i] = simdProcessorAux.nullEmptyCmpNe(dataVecAux, emptyFilterArgVecAux); + *blockAux += vectorSizeAux; + *ridArray += vectorSizeAux; + } + + *ridArray = origRidArray; +} + // This routine filters input block in a vectorized manner. // It supports all output types, all input types. // It doesn't support KIND==TEXT so upper layers filters this KIND out beforehand. @@ -1499,12 +1523,12 @@ void extractTextMinMax(VT& simdProcessor, SimdType simdMin, SimdType simdMax, Si // Then it takes a vector of data, run filters and logical function using pointers. // See the corresponding dispatcher to get more details on vector processing class. template -void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* srcArray, - const uint32_t srcSize, primitives::RIDType* ridArray, const uint16_t ridSize, - ParsedColumnFilter* parsedColumnFilter, const bool validMinMax, const T emptyValue, - const T nullValue, T min, T max, const bool isNullValueMatches, - const bool hasAuxCol, const uint8_t* blockAux, uint8_t emptyValueAux) + ENUM_KIND KIND, typename FT, typename ST, bool IS_AUX_COLUMN> +void vectorizedFiltering_(NewColRequestHeader* in, ColResultHeader* out, const T* srcArray, + const uint32_t srcSize, primitives::RIDType* ridArray, const uint16_t ridSize, + ParsedColumnFilter* parsedColumnFilter, const bool validMinMax, const T emptyValue, + const T nullValue, T min, T max, const bool isNullValueMatches, + const uint8_t* blockAux, uint8_t emptyValueAux) { constexpr const uint16_t WIDTH = sizeof(T); using SimdType = typename VT::SimdType; @@ -1516,7 +1540,7 @@ void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* SimdType dataVec; [[maybe_unused]] SimdType swapedOrderDataVec; [[maybe_unused]] auto typeHolder = in->colType; - SimdType emptyFilterArgVec = simdProcessor.emptyNullLoadValue(emptyValue); + [[maybe_unused]] SimdType emptyFilterArgVec = simdProcessor.emptyNullLoadValue(emptyValue); SimdType nullFilterArgVec = simdProcessor.emptyNullLoadValue(nullValue); MT writeMask, nonEmptyMask, nonNullMask, nonNullOrEmptyMask; MT initFilterMask = 0xFFFF; @@ -1637,177 +1661,77 @@ void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* weightsMax = simdSwapedOrderDataLoad(typeHolder, simdProcessor, simdMax).v; } - if (hasAuxCol) + MT* nonEmptyMaskAux; + + if constexpr (IS_AUX_COLUMN) { - using SimdTypeTemp = typename simd::IntegralToSIMD::type; - using FilterTypeTemp = typename simd::StorageToFiltering::type; - using VTAux = typename simd::SimdFilterProcessor; - using SimdTypeAux = typename VTAux::SimdType; - using SimdWrapperTypeAux = typename VTAux::SimdWrapperType; - VTAux simdProcessorAux; - SimdTypeAux dataVecAux; - SimdTypeAux emptyFilterArgVecAux = simdProcessorAux.emptyNullLoadValue(emptyValueAux); - const uint8_t* origBlockAux = blockAux; - constexpr uint16_t VECTOR_SIZE_AUX = VT::vecByteSize; - uint16_t iterNumberAux = HAS_INPUT_RIDS ? ridSize / VECTOR_SIZE_AUX : srcSize / VECTOR_SIZE_AUX; - MT* nonEmptyMaskAux = (MT*) alloca(sizeof(MT) * iterNumberAux); - primitives::RIDType* origRidArray = ridArray; - - for (uint16_t i = 0; i < iterNumberAux; ++i) - { - dataVecAux = simdDataLoad(simdProcessorAux, blockAux, - origBlockAux, ridArray, i).v; - nonEmptyMaskAux[i] = simdProcessorAux.nullEmptyCmpNe(dataVecAux, emptyFilterArgVecAux); - blockAux += VECTOR_SIZE_AUX; - ridArray += VECTOR_SIZE_AUX; - } - - ridArray = origRidArray; - - MT (*getNonEmptyMaskPtr)(MT*, uint16_t); - - switch(WIDTH) - { - case 1: - getNonEmptyMaskPtr = getNonEmptyMask1Byte; - break; - case 2: - getNonEmptyMaskPtr = getNonEmptyMask2Byte; - break; - case 4: - getNonEmptyMaskPtr = getNonEmptyMask4Byte; - break; - case 8: - getNonEmptyMaskPtr = getNonEmptyMask8Byte; - break; - case 16: - getNonEmptyMaskPtr = getNonEmptyMask16Byte; - break; - } - - // main loop - // writeMask tells which values must get into the result. Includes values that matches filters. Can have - // NULLs. nonEmptyMask tells which vector coords are not EMPTY magics. nonNullMask tells which vector coords - // are not NULL magics. - for (uint16_t i = 0; i < iterNumber; ++i) - { - primitives::RIDType ridOffset = i * VECTOR_SIZE; - assert(!HAS_INPUT_RIDS || (HAS_INPUT_RIDS && ridSize >= ridOffset)); - dataVec = simdDataLoad(simdProcessor, srcArray, - origSrcArray, ridArray, i).v; - if constexpr(KIND==KIND_TEXT) - swapedOrderDataVec = simdSwapedOrderDataLoad(typeHolder, simdProcessor, dataVec).v; - nonEmptyMask = (*getNonEmptyMaskPtr)(nonEmptyMaskAux, i); - writeMask = nonEmptyMask; - // NULL check - nonNullMask = simdProcessor.nullEmptyCmpNe(dataVec, nullFilterArgVec); - // Exclude NULLs from the resulting set if NULL doesn't match the filters. - writeMask = isNullValueMatches ? writeMask : writeMask & nonNullMask; - nonNullOrEmptyMask = nonNullMask & nonEmptyMask; - // filters - MT prevFilterMask = initFilterMask; - // TODO name this mask literal - MT filterMask = 0xFFFF; - for (uint32_t j = 0; j < filterCount; ++j) - { - // filter using compiled filter and preloaded filter argument - if constexpr(KIND==KIND_TEXT) - filterMask = copFunctorVec[j](simdProcessor, swapedOrderDataVec, filterArgsVectors[j]); - else - filterMask = copFunctorVec[j](simdProcessor, dataVec, filterArgsVectors[j]); - - filterMask = bopFunctor(prevFilterMask, filterMask); - prevFilterMask = filterMask; - } - writeMask = writeMask & filterMask; - - T* dataVecTPtr = reinterpret_cast(&dataVec); - - // vectWriteColValues iterates over the values in the source vec - // to store values/RIDs into dstArray/ridDstArray. - // It also sets min/max values for the block if eligible. - // !!! vectWriteColValues increases ridDstArray internally but it doesn't go - // outside the scope of the memory allocated to out msg. - // vectWriteColValues is empty if outputMode == OT_RID. - uint16_t valuesWritten = vectWriteColValues( - simdProcessor, writeMask, nonNullOrEmptyMask, validMinMax, ridOffset, dataVecTPtr, dstArray, min, max, - in, out, ridDstArray, ridArray); - // Some outputType modes saves RIDs also. vectWriteRIDValues is empty for - // OT_DATAVALUE, OT_BOTH(vectWriteColValues takes care about RIDs). - valuesWritten = vectWriteRIDValues( - simdProcessor, valuesWritten, validMinMax, ridOffset, dataVecTPtr, ridDstArray, writeMask, min, max, - in, out, nonNullOrEmptyMask, ridArray); - - if constexpr (KIND != KIND_TEXT) - vectorizedUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax); - else - vectorizedTextUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax, - swapedOrderDataVec, weightsMin, weightsMax); - - // Calculate bytes written - uint16_t bytesWritten = valuesWritten * WIDTH; - totalValuesWritten += valuesWritten; - ridDstArray += valuesWritten; - dstArray += bytesWritten; - rid += VECTOR_SIZE; - srcArray += VECTOR_SIZE; - ridArray += VECTOR_SIZE; - } + constexpr uint16_t vectorSizeAux = VT::vecByteSize; + uint16_t iterNumberAux = HAS_INPUT_RIDS ? ridSize / vectorSizeAux : srcSize / vectorSizeAux; + nonEmptyMaskAux = (MT*) alloca(sizeof(MT) * iterNumberAux); + buildAuxColEmptyVal(iterNumberAux, vectorSizeAux, &blockAux, emptyValueAux, + &nonEmptyMaskAux, &ridArray); } - else + + constexpr getNonEmptyMaskPtrT getNonEmptyMaskPtr = getNonEmptyMaskPtrTemplate(); + + // main loop + // writeMask tells which values must get into the result. Includes values that matches filters. Can have + // NULLs. nonEmptyMask tells which vector coords are not EMPTY magics. nonNullMask tells which vector coords + // are not NULL magics. + for (uint16_t i = 0; i < iterNumber; ++i) { - // main loop - // writeMask tells which values must get into the result. Includes values that matches filters. Can have - // NULLs. nonEmptyMask tells which vector coords are not EMPTY magics. nonNullMask tells which vector coords - // are not NULL magics. - for (uint16_t i = 0; i < iterNumber; ++i) - { - primitives::RIDType ridOffset = i * VECTOR_SIZE; - assert(!HAS_INPUT_RIDS || (HAS_INPUT_RIDS && ridSize >= ridOffset)); - dataVec = simdDataLoad(simdProcessor, srcArray, - origSrcArray, ridArray, i).v; - if constexpr(KIND==KIND_TEXT) - swapedOrderDataVec = simdSwapedOrderDataLoad(typeHolder, simdProcessor, dataVec).v; + primitives::RIDType ridOffset = i * VECTOR_SIZE; + assert(!HAS_INPUT_RIDS || (HAS_INPUT_RIDS && ridSize >= ridOffset)); + dataVec = simdDataLoad(simdProcessor, srcArray, + origSrcArray, ridArray, i).v; + + if constexpr(KIND==KIND_TEXT) + swapedOrderDataVec = simdSwapedOrderDataLoad(typeHolder, simdProcessor, dataVec).v; + + if constexpr (IS_AUX_COLUMN) + nonEmptyMask = (*getNonEmptyMaskPtr)(nonEmptyMaskAux, i); + else nonEmptyMask = simdProcessor.nullEmptyCmpNe(dataVec, emptyFilterArgVec); - writeMask = nonEmptyMask; - // NULL check - nonNullMask = simdProcessor.nullEmptyCmpNe(dataVec, nullFilterArgVec); - // Exclude NULLs from the resulting set if NULL doesn't match the filters. - writeMask = isNullValueMatches ? writeMask : writeMask & nonNullMask; - nonNullOrEmptyMask = nonNullMask & nonEmptyMask; - // filters - MT prevFilterMask = initFilterMask; - // TODO name this mask literal - MT filterMask = 0xFFFF; - for (uint32_t j = 0; j < filterCount; ++j) - { - // filter using compiled filter and preloaded filter argument - if constexpr(KIND==KIND_TEXT) - filterMask = copFunctorVec[j](simdProcessor, swapedOrderDataVec, filterArgsVectors[j]); - else - filterMask = copFunctorVec[j](simdProcessor, dataVec, filterArgsVectors[j]); - filterMask = bopFunctor(prevFilterMask, filterMask); - prevFilterMask = filterMask; - } - writeMask = writeMask & filterMask; + writeMask = nonEmptyMask; + // NULL check + nonNullMask = simdProcessor.nullEmptyCmpNe(dataVec, nullFilterArgVec); + // Exclude NULLs from the resulting set if NULL doesn't match the filters. + writeMask = isNullValueMatches ? writeMask : writeMask & nonNullMask; + nonNullOrEmptyMask = nonNullMask & nonEmptyMask; + // filters + MT prevFilterMask = initFilterMask; + // TODO name this mask literal + MT filterMask = 0xFFFF; + for (uint32_t j = 0; j < filterCount; ++j) + { + // filter using compiled filter and preloaded filter argument + if constexpr(KIND==KIND_TEXT) + filterMask = copFunctorVec[j](simdProcessor, swapedOrderDataVec, filterArgsVectors[j]); + else + filterMask = copFunctorVec[j](simdProcessor, dataVec, filterArgsVectors[j]); - T* dataVecTPtr = reinterpret_cast(&dataVec); + filterMask = bopFunctor(prevFilterMask, filterMask); + prevFilterMask = filterMask; + } + writeMask = writeMask & filterMask; - // vectWriteColValues iterates over the values in the source vec - // to store values/RIDs into dstArray/ridDstArray. - // It also sets min/max values for the block if eligible. - // !!! vectWriteColValues increases ridDstArray internally but it doesn't go - // outside the scope of the memory allocated to out msg. - // vectWriteColValues is empty if outputMode == OT_RID. - uint16_t valuesWritten = vectWriteColValues( - simdProcessor, writeMask, nonNullOrEmptyMask, validMinMax, ridOffset, dataVecTPtr, dstArray, min, max, - in, out, ridDstArray, ridArray); - // Some outputType modes saves RIDs also. vectWriteRIDValues is empty for - // OT_DATAVALUE, OT_BOTH(vectWriteColValues takes care about RIDs). - valuesWritten = vectWriteRIDValues( - simdProcessor, valuesWritten, validMinMax, ridOffset, dataVecTPtr, ridDstArray, writeMask, min, max, - in, out, nonNullOrEmptyMask, ridArray); + T* dataVecTPtr = reinterpret_cast(&dataVec); + + // vectWriteColValues iterates over the values in the source vec + // to store values/RIDs into dstArray/ridDstArray. + // It also sets min/max values for the block if eligible. + // !!! vectWriteColValues increases ridDstArray internally but it doesn't go + // outside the scope of the memory allocated to out msg. + // vectWriteColValues is empty if outputMode == OT_RID. + uint16_t valuesWritten = vectWriteColValues( + simdProcessor, writeMask, nonNullOrEmptyMask, validMinMax, ridOffset, dataVecTPtr, dstArray, min, max, + in, out, ridDstArray, ridArray); + // Some outputType modes saves RIDs also. vectWriteRIDValues is empty for + // OT_DATAVALUE, OT_BOTH(vectWriteColValues takes care about RIDs). + valuesWritten = vectWriteRIDValues( + simdProcessor, valuesWritten, validMinMax, ridOffset, dataVecTPtr, ridDstArray, writeMask, min, max, + in, out, nonNullOrEmptyMask, ridArray); if constexpr (KIND != KIND_TEXT) vectorizedUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax); @@ -1815,15 +1739,14 @@ void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* vectorizedTextUpdateMinMax(validMinMax, nonNullOrEmptyMask, simdProcessor, dataVec, simdMin, simdMax, swapedOrderDataVec, weightsMin, weightsMax); - // Calculate bytes written - uint16_t bytesWritten = valuesWritten * WIDTH; - totalValuesWritten += valuesWritten; - ridDstArray += valuesWritten; - dstArray += bytesWritten; - rid += VECTOR_SIZE; - srcArray += VECTOR_SIZE; - ridArray += VECTOR_SIZE; - } + // Calculate bytes written + uint16_t bytesWritten = valuesWritten * WIDTH; + totalValuesWritten += valuesWritten; + ridDstArray += valuesWritten; + dstArray += bytesWritten; + rid += VECTOR_SIZE; + srcArray += VECTOR_SIZE; + ridArray += VECTOR_SIZE; } if constexpr (KIND != KIND_TEXT) @@ -1847,7 +1770,33 @@ void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* scalarFiltering(in, out, columnFilterMode, filterSet, filterCount, filterCOPs, filterValues, filterRFs, in->colType, origSrcArray, srcSize, origRidArray, ridSize, processedSoFar, outputType, validMinMax, emptyValue, nullValue, - min, max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + min, max, isNullValueMatches, blockAux, emptyValueAux); +} + +template +void vectorizedFiltering(NewColRequestHeader* in, ColResultHeader* out, const T* srcArray, + const uint32_t srcSize, primitives::RIDType* ridArray, const uint16_t ridSize, + ParsedColumnFilter* parsedColumnFilter, const bool validMinMax, const T emptyValue, + const T nullValue, T min, T max, const bool isNullValueMatches, + const uint8_t* blockAux, uint8_t emptyValueAux) +{ + if (in->hasAuxCol) + { + vectorizedFiltering_( + in, out, srcArray, srcSize, ridArray, ridSize, + parsedColumnFilter, validMinMax, emptyValue, + nullValue, min, max, isNullValueMatches, + blockAux, emptyValueAux); + } + else + { + vectorizedFiltering_( + in, out, srcArray, srcSize, ridArray, ridSize, + parsedColumnFilter, validMinMax, emptyValue, + nullValue, min, max, isNullValueMatches, + blockAux, emptyValueAux); + } } // This routine dispatches template function calls to reduce branching. @@ -1858,7 +1807,7 @@ void vectorizedFilteringDispatcher(NewColRequestHeader* in, ColResultHeader* out const bool validMinMax, const STORAGE_TYPE emptyValue, const STORAGE_TYPE nullValue, STORAGE_TYPE Min, STORAGE_TYPE Max, const bool isNullValueMatches, - const bool hasAuxCol, const uint8_t* blockAux, uint8_t emptyValueAux) + const uint8_t* blockAux, uint8_t emptyValueAux) { // Using struct to dispatch SIMD type based on integral type T. using SimdType = typename simd::IntegralToSIMD::type; @@ -1873,22 +1822,22 @@ void vectorizedFilteringDispatcher(NewColRequestHeader* in, ColResultHeader* out case OT_RID: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; case OT_BOTH: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; case OT_TOKEN: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; case OT_DATAVALUE: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; } } @@ -1900,22 +1849,22 @@ void vectorizedFilteringDispatcher(NewColRequestHeader* in, ColResultHeader* out case OT_RID: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; case OT_BOTH: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; case OT_TOKEN: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; case OT_DATAVALUE: vectorizedFiltering( in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter, validMinMax, emptyValue, - nullValue, Min, Max, isNullValueMatches, hasAuxCol, blockAux, emptyValueAux); + nullValue, Min, Max, isNullValueMatches, blockAux, emptyValueAux); break; } } @@ -1933,7 +1882,7 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r const uint16_t ridSize, // Number of values in ridArray int* srcArray16, const uint32_t srcSize, boost::shared_ptr parsedColumnFilter, - bool hasAuxCol, int* blockAux) + int* blockAux) { using FT = typename IntegralTypeToFilterType::type; using ST = typename IntegralTypeToFilterSetType::type; @@ -1996,7 +1945,7 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r vectorizedFilteringDispatcher(in, out, srcArray, srcSize, ridArray, ridSize, parsedColumnFilter.get(), validMinMax, emptyValue, nullValue, Min, Max, isNullValueMatches, - hasAuxCol, reinterpret_cast(blockAux), + reinterpret_cast(blockAux), emptyValueAux); return; } @@ -2006,7 +1955,7 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r scalarFiltering(in, out, columnFilterMode, filterSet, filterCount, filterCOPs, filterValues, filterRFs, in->colType, srcArray, srcSize, ridArray, ridSize, initialRID, outputType, validMinMax, emptyValue, nullValue, Min, Max, - isNullValueMatches, hasAuxCol, reinterpret_cast(blockAux), + isNullValueMatches, reinterpret_cast(blockAux), emptyValueAux); } // end of filterColumnData @@ -2043,8 +1992,7 @@ template ::type* = nullptr> #endif void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, - ColResultHeader* out, - bool hasAuxCol) + ColResultHeader* out) { constexpr int W = sizeof(T); auto dataType = (execplan::CalpontSystemCatalog::ColDataType)in->colType.DataType; @@ -2053,10 +2001,10 @@ void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, const uint16_t ridSize = in->NVALS; uint16_t* ridArray = in->getRIDArrayPtr(W); const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE : BLOCK_SIZE / W; - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, blockAux); return; } - _scanAndFilterTypeDispatcher(in, out, hasAuxCol); + _scanAndFilterTypeDispatcher(in, out); } template ::type* = nullptr> #endif void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, - ColResultHeader* out, - bool hasAuxCol) + ColResultHeader* out) { constexpr int W = sizeof(T); auto dataType = (execplan::CalpontSystemCatalog::ColDataType)in->colType.DataType; @@ -2080,10 +2027,10 @@ void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, const uint16_t ridSize = in->NVALS; uint16_t* ridArray = in->getRIDArrayPtr(W); const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE : BLOCK_SIZE / W; - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, blockAux); return; } - _scanAndFilterTypeDispatcher(in, out, hasAuxCol); + _scanAndFilterTypeDispatcher(in, out); } template ::type* = nullptr> #endif void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, - ColResultHeader* out, - bool hasAuxCol) + ColResultHeader* out) { - _scanAndFilterTypeDispatcher(in, out, hasAuxCol); + _scanAndFilterTypeDispatcher(in, out); } template ::type* = nullptr> #endif void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, - ColResultHeader* out, - bool hasAuxCol) + ColResultHeader* out) { constexpr int W = sizeof(T); const uint16_t ridSize = in->NVALS; uint16_t* ridArray = in->getRIDArrayPtr(W); const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE : BLOCK_SIZE / W; - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, blockAux); } template ::type* = nullptr> #endif void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, - ColResultHeader* out, - bool hasAuxCol) + ColResultHeader* out) { constexpr int W = sizeof(T); using UT = typename std::conditional::value || datatypes::is_uint128_t::value, T, @@ -2155,23 +2099,22 @@ void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, dataType == execplan::CalpontSystemCatalog::TEXT) && !isDictTokenScan(in)) { - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, blockAux); return; } if (datatypes::isUnsigned(dataType)) { - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, blockAux); return; } - filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, hasAuxCol, blockAux); + filterColumnData(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter, blockAux); } // The entrypoint for block scanning and filtering. // The block is in in msg, out msg is used to store values|RIDs matched. template -void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out, - bool hasAuxCol) +void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out) { #ifdef PRIM_DEBUG auto markEvent = [&](char eventChar) @@ -2210,26 +2153,21 @@ void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultH // Sort ridArray (the row index array) if there are RIDs with this in msg in->sortRIDArrayIfNeeded(W); - scanAndFilterTypeDispatcher(in, out, hasAuxCol); + scanAndFilterTypeDispatcher(in, out); #ifdef PRIM_DEBUG markEvent('C'); #endif } template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*, - bool); + ColResultHeader*); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*, - bool); + ColResultHeader*); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*, - bool); + ColResultHeader*); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*, - bool); + ColResultHeader*); template void primitives::PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader*, - ColResultHeader*, - bool); + ColResultHeader*); } // namespace primitives diff --git a/primitives/linux-port/primitiveprocessor.h b/primitives/linux-port/primitiveprocessor.h index 16a2ba416..23340f5e3 100644 --- a/primitives/linux-port/primitiveprocessor.h +++ b/primitives/linux-port/primitiveprocessor.h @@ -396,20 +396,20 @@ class PrimitiveProcessor template ::type* = nullptr> - void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); + void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); template ::type* = nullptr> - void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); + void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); template ::type* = nullptr> - void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); + void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); template ::type* = nullptr> - void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); + void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); template ::type* = nullptr> - void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); + void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out); template - void columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out, bool hasAuxCol); + void columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out); boost::shared_ptr parseColumnFilter(const uint8_t* filterString, uint32_t colWidth, uint32_t colType, uint32_t filterCount, diff --git a/primitives/primproc/columncommand.cpp b/primitives/primproc/columncommand.cpp index e70521d8f..b922ef191 100644 --- a/primitives/primproc/columncommand.cpp +++ b/primitives/primproc/columncommand.cpp @@ -203,7 +203,7 @@ void ColumnCommand::_loadData() bpp->physIO += blocksRead; bpp->touchedBlocks += blocksToLoad; - if (_hasAuxCol) + if (hasAuxCol_) { BRM::LBID_t* lbidsAux = (BRM::LBID_t*)alloca(1 * sizeof(BRM::LBID_t)); uint8_t** blockPtrsAux = (uint8_t**)alloca(1 * sizeof(uint8_t*)); @@ -288,9 +288,10 @@ template void ColumnCommand::_issuePrimitive() { using IntegralType = typename datatypes::WidthToSIntegralType::type; + primMsg->hasAuxCol = hasAuxCol_; // Down the call stack the code presumes outMsg buffer has enough space to store // ColRequestHeader + uint16_t Rids[8192] + IntegralType[8192]. - bpp->getPrimitiveProcessor().columnScanAndFilter(primMsg, outMsg, _hasAuxCol); + bpp->getPrimitiveProcessor().columnScanAndFilter(primMsg, outMsg); } // _issuePrimitive() void ColumnCommand::updateCPDataNarrow() @@ -541,7 +542,7 @@ void ColumnCommand::createCommand(ByteStream& bs) bs >> BOP; bs >> filterCount; bs >> tmp8; - _hasAuxCol = tmp8; + hasAuxCol_ = tmp8; deserializeInlineVector(bs, lastLbid); Command::createCommand(bs); @@ -575,7 +576,7 @@ void ColumnCommand::createCommand(execplan::CalpontSystemCatalog::ColType& aColT bs >> BOP; bs >> filterCount; bs >> tmp8; - _hasAuxCol = tmp8; + hasAuxCol_ = tmp8; deserializeInlineVector(bs, lastLbid); Command::createCommand(bs); @@ -585,7 +586,7 @@ void ColumnCommand::resetCommand(ByteStream& bs) { bs >> lbid; - if (_hasAuxCol) + if (hasAuxCol_) bs >> lbidAux; } @@ -848,14 +849,14 @@ void ColumnCommand::nextLBID() { lbid += colType.colWidth; - if (_hasAuxCol) + if (hasAuxCol_) lbidAux += execplan::AUX_COL_WIDTH; } void ColumnCommand::duplicate(ColumnCommand* cc) { cc->_isScan = _isScan; - cc->_hasAuxCol = _hasAuxCol; + cc->hasAuxCol_ = hasAuxCol_; cc->traceFlags = traceFlags; cc->filterString = filterString; cc->colType.colDataType = colType.colDataType; @@ -887,7 +888,7 @@ bool ColumnCommand::operator==(const ColumnCommand& cc) const if (_isScan != cc._isScan) return false; - if (_hasAuxCol != cc._hasAuxCol) + if (hasAuxCol_ != cc.hasAuxCol_) return false; if (BOP != cc.BOP) @@ -919,7 +920,7 @@ bool ColumnCommand::operator!=(const ColumnCommand& cc) const ColumnCommand& ColumnCommand::operator=(const ColumnCommand& c) { _isScan = c._isScan; - _hasAuxCol = c._hasAuxCol; + hasAuxCol_ = c.hasAuxCol_; traceFlags = c.traceFlags; filterString = c.filterString; colType.colDataType = c.colType.colDataType; @@ -958,7 +959,7 @@ void ColumnCommand::getLBIDList(uint32_t loopCount, vector* lbids) for (i = firstLBID; i <= lastLBID; i++) lbids->push_back(i); - if (_hasAuxCol) + if (hasAuxCol_) { firstLBID = lbidAux; lastLBID = firstLBID + (loopCount * execplan::AUX_COL_WIDTH) - 1; diff --git a/primitives/primproc/columncommand.h b/primitives/primproc/columncommand.h index 070655b58..19a4da597 100644 --- a/primitives/primproc/columncommand.h +++ b/primitives/primproc/columncommand.h @@ -79,7 +79,7 @@ class ColumnCommand : public Command bool hasAuxCol() const { - return _hasAuxCol; + return hasAuxCol_; } uint64_t getLBIDAux() const { @@ -173,7 +173,7 @@ class ColumnCommand : public Command uint32_t baseMsgLength; uint64_t lbid; - bool _hasAuxCol; + bool hasAuxCol_; uint64_t lbidAux; uint32_t traceFlags; // probably move this to Command uint8_t BOP; diff --git a/tests/primitives_column_scan_and_filter.cpp b/tests/primitives_column_scan_and_filter.cpp index 9b0be4026..f017892d0 100644 --- a/tests/primitives_column_scan_and_filter.cpp +++ b/tests/primitives_column_scan_and_filter.cpp @@ -136,7 +136,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan1Byte) in->NVALS = 0; pp.setBlockPtr((int*)readBlockFromLiteralArray("col1block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); EXPECT_EQ(out->NVALS, 8160); @@ -160,7 +160,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan1ByteVectorized) in->NVALS = 0; pp.setBlockPtr((int*)readBlockFromLiteralArray("col1block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); EXPECT_EQ(out->NVALS, 8160); @@ -196,7 +196,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan2Bytes) in->NVALS = 0; pp.setBlockPtr((int*)readBlockFromLiteralArray("col2block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); EXPECT_EQ(out->NVALS, 4096); @@ -223,7 +223,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes) in->NVALS = 0; pp.setBlockPtr((int*)readBlockFromLiteralArray("col4block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); EXPECT_EQ(out->NVALS, 2048); @@ -251,7 +251,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes) in->NVALS = 0; pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 1024); @@ -287,7 +287,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan2Bytes1EqFilter) &input[sizeof(NewColRequestHeader) + sizeof(ColArgs) + in->colType.DataSize]); pp.setBlockPtr((int*)readBlockFromLiteralArray("col2block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 51); @@ -315,7 +315,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan1ByteUsingRID) rids[1] = 17; pp.setBlockPtr((int*)readBlockFromLiteralArray("col1block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 2); @@ -344,7 +344,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan1ByteUsingMultipleRIDs) rids[126] = 8189; pp.setBlockPtr((int*)readBlockFromLiteralArray("col1block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, expectedNVALS); @@ -376,7 +376,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes1EqFilter) &input[sizeof(NewColRequestHeader) + sizeof(ColArgs) + in->colType.DataSize]); pp.setBlockPtr((int*)readBlockFromLiteralArray("col4block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 8); @@ -404,7 +404,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesUsingMultipleRIDs) rids[126] = 1020; pp.setBlockPtr((int*)readBlockFromLiteralArray("col4block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); @@ -438,7 +438,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes2Filters) memcpy(args->val, &tmp, in->colType.DataSize); pp.setBlockPtr((int*)readBlockFromLiteralArray("col4block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 9); @@ -477,7 +477,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes1EqFilter) &input[sizeof(NewColRequestHeader) + sizeof(ColArgs) + in->colType.DataSize]); pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 11); @@ -511,7 +511,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8BytesUsingMultipleRIDs) rids[126] = 1020; pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); @@ -547,7 +547,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2CompFilters) memcpy(args->val, &tmp, in->colType.DataSize); pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 33); @@ -587,7 +587,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFilters) memcpy(args->val, &tmp, in->colType.DataSize); pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); @@ -631,7 +631,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRID) rids[1] = 100; pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 1); @@ -662,7 +662,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2FiltersRIDOutputRid) memcpy(args->val, &tmp, in->colType.DataSize); pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstRIDArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 33); @@ -701,7 +701,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRIDOutputBoth) memcpy(args->val, &tmp, in->colType.DataSize); pp.setBlockPtr((int*)readBlockFromLiteralArray("col8block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); ASSERT_EQ(out->NVALS, 33); @@ -742,7 +742,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan1Byte2CompFilters) args->val[0] = '4'; pp.setBlockPtr((int*)readBlockFromLiteralArray("col1block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 32); @@ -791,7 +791,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes2CompFiltersOutputRID) memcpy(&args->val[in->colType.DataSize], &ridTmp, 2); pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = reinterpret_cast(&output[sizeof(ColResultHeader)]); ASSERT_EQ(out->NVALS, 2); @@ -823,7 +823,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8BytesDouble2CompFilters) memcpy(args->val, &tmp, sizeof(tmp)); pp.setBlockPtr((int*)readBlockFromLiteralArray("col_double_block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); ASSERT_EQ(out->NVALS, 8); @@ -857,7 +857,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesFloat2CompFiltersOutputBoth) memcpy(args->val, &tmp, sizeof(tmp)); pp.setBlockPtr((int*)readBlockFromLiteralArray("col_float_block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); ASSERT_EQ(out->NVALS, 8); @@ -892,7 +892,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegFloat2CompFiltersOutputBoth) memcpy(args->val, &tmp, sizeof(tmp)); pp.setBlockPtr((int*)readBlockFromLiteralArray("col_neg_float.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); ASSERT_EQ(out->NVALS, 19); for (i = 0; i < out->NVALS; i++) @@ -926,7 +926,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8BytesNegDouble2CompFilters) memcpy(args->val, &tmp, sizeof(tmp)); pp.setBlockPtr((int*)readBlockFromLiteralArray("col_neg_double.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); ASSERT_EQ(out->NVALS, 19); @@ -950,7 +950,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan16Bytes) in->NVALS = 0; pp.setBlockPtr((int*)readBlockFromLiteralArray("col16block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); @@ -992,7 +992,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan16Bytes2CompFilters) memcpy(args->val, &tmp, in->colType.DataSize); pp.setBlockPtr((int*)readBlockFromLiteralArray("col16block.cdf", block)); - pp.columnScanAndFilter(in, out, false); + pp.columnScanAndFilter(in, out); results = getValuesArrayPosition(getFirstValueArrayPosition(out), 0); diff --git a/versioning/BRM/dbrm.cpp b/versioning/BRM/dbrm.cpp index fcc7d89cd..609d5d1e6 100644 --- a/versioning/BRM/dbrm.cpp +++ b/versioning/BRM/dbrm.cpp @@ -43,6 +43,7 @@ #include "configcpp.h" #include "sessionmanagerserver.h" #include "messagequeuepool.h" +#include "blocksize.h" #define DBRM_DLLEXPORT #include "dbrm.h" #undef DBRM_DLLEXPORT @@ -4513,7 +4514,7 @@ void DBRM::addToLBIDList(uint32_t sessionID, vector& lbidList) } } - uint32_t extentNumAux = fbo / (execplan::AUX_COL_WIDTH * 1024); + uint32_t extentNumAux = fbo / ((getExtentRows() * execplan::AUX_COL_WIDTH) / BLOCK_SIZE); for (auto iter = extentMap[tableOid].begin(); iter != extentMap[tableOid].end(); iter++) { diff --git a/writeengine/bulk/we_bulkload.cpp b/writeengine/bulk/we_bulkload.cpp index 9b8905d6e..a152ae137 100644 --- a/writeengine/bulk/we_bulkload.cpp +++ b/writeengine/bulk/we_bulkload.cpp @@ -265,7 +265,7 @@ int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc return rc; } - Job& curJob = const_cast(fJobInfo.getJob()); + Job& curJob = fJobInfo.getJob(); string logFile, errlogFile; logFile = std::string(MCSLOGDIR) + "/cpimport/" + "Job_" + Convertor::int2Str(curJob.id) + LOG_SUFFIX; errlogFile = @@ -319,6 +319,8 @@ int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc execplan::CalpontSystemCatalog::OID tableAUXColOid; std::string tblName; std::string curTblName = curJob.jobTableList[i].tblName; + + // Parse out from [.] string string::size_type startName = curTblName.rfind('.'); if (startName == std::string::npos) @@ -376,18 +378,10 @@ int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc // tableAUXColOid = 0 if (tableAUXColOid > 3000) { - JobColumn curColumn; - curColumn.colName = "aux"; - curColumn.mapOid = tableAUXColOid; - curColumn.typeName = execplan::AUX_COL_DATATYPE_STRING; - curColumn.width = execplan::AUX_COL_WIDTH; - curColumn.definedWidth = execplan::AUX_COL_WIDTH; - curColumn.compressionType = execplan::AUX_COL_COMPRESSION_TYPE; - curColumn.dctnry.fCompressionType = execplan::AUX_COL_COMPRESSION_TYPE; - curColumn.fMinIntSat = execplan::AUX_COL_MINVALUE; - curColumn.fMaxIntSat = execplan::AUX_COL_MAXVALUE; - curColumn.fWithDefault = true; - curColumn.fDefaultUInt = 1; + JobColumn curColumn("aux", tableAUXColOid, execplan::AUX_COL_DATATYPE_STRING, + execplan::AUX_COL_WIDTH, execplan::AUX_COL_WIDTH, + execplan::AUX_COL_COMPRESSION_TYPE, execplan::AUX_COL_COMPRESSION_TYPE, + execplan::AUX_COL_MINVALUE, execplan::AUX_COL_MAXVALUE, true, 1); curJob.jobTableList[i].colList.push_back(curColumn); JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, curJob.jobTableList[i].colList.size() - 1); curJob.jobTableList[i].fFldRefs.push_back(fieldRef); diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 264a47dd1..5eb5cfbc1 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -91,6 +91,27 @@ WE_DMLCommandProc::~WE_DMLCommandProc() dbRootExtTrackerVec.clear(); } +void WE_DMLCommandProc::processAuxCol(const std::vector& origVals, + WriteEngine::ColValueList& colValuesList, + WriteEngine::DictStrList& dicStringList) +{ + WriteEngine::ColTupleList auxColTuples; + WriteEngine::dictStr auxDicStrings; + + for (uint32_t j = 0; j < origVals.size(); j++) + { + WriteEngine::ColTuple auxColTuple; + auxColTuple.data = (uint8_t)1; + auxColTuples.push_back(auxColTuple); + //@Bug 2515. Only pass string values to write engine + auxDicStrings.push_back(""); + } + + colValuesList.push_back(auxColTuples); + //@Bug 2515. Only pass string values to write engine + dicStringList.push_back(auxDicStrings); +} + uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; @@ -443,22 +464,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: // MCOL-5021 if ((i == numcols - 1) && (tableAUXColOid > 3000)) { - WriteEngine::ColTupleList auxColTuples; - WriteEngine::dictStr auxDicStrings; - - for (uint32_t j = 0; j < origVals.size(); j++) - { - WriteEngine::ColTuple auxColTuple; - auxColTuple.data = (uint8_t)1; - - auxColTuples.push_back(auxColTuple); - //@Bug 2515. Only pass string values to write engine - auxDicStrings.push_back(""); - } - - colValuesList.push_back(auxColTuples); - //@Bug 2515. Only pass string values to write engine - dicStringList.push_back(auxDicStrings); + processAuxCol(origVals, colValuesList, dicStringList); } ++row_iterator; @@ -1385,22 +1391,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: // MCOL-5021 if ((i == numcols - 1) && (tableAUXColOid > 3000)) { - WriteEngine::ColTupleList auxColTuples; - WriteEngine::dictStr auxDicStrings; - - for (uint32_t j = 0; j < origVals.size(); j++) - { - WriteEngine::ColTuple auxColTuple; - auxColTuple.data = (uint8_t)1; - - auxColTuples.push_back(auxColTuple); - //@Bug 2515. Only pass string values to write engine - auxDicStrings.push_back(""); - } - - colValuesList.push_back(auxColTuples); - //@Bug 2515. Only pass string values to write engine - dicStringList.push_back(auxDicStrings); + processAuxCol(origVals, colValuesList, dicStringList); } ++row_iterator; @@ -1502,7 +1493,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: return rc; } -#if 0 uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId) { @@ -1665,8 +1655,9 @@ uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, return rc; } - //@Bug 5996 validate hwm before starts - rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting"); + // @Bug 5996 validate hwm before starts + // TODO MCOL-5021 hasAuxCol is hardcoded to false; add support here. + rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting", false); if (rc != 0) { @@ -2233,7 +2224,6 @@ uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); return rc; } -#endif uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err) { diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index 5db46d8d4..c80be75ba 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -78,8 +78,8 @@ class WE_DMLCommandProc EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string& err); EXPORT uint8_t processBatchInsert(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId); - //EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, - // ByteStream::quadbyte& PMId); + EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, + ByteStream::quadbyte& PMId); EXPORT uint8_t commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err); EXPORT uint8_t commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err); EXPORT uint8_t rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err); @@ -127,6 +127,10 @@ class WE_DMLCommandProc const std::vector& files, const std::vector& oidsToFlush, std::string& err); + void processAuxCol(const std::vector& origVals, + WriteEngine::ColValueList& colValuesList, + WriteEngine::DictStrList& dicStringList); + bool fIsFirstBatchPm; std::map rowGroups; std::map cpackages; diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 95bb4c4e2..9da77a3a2 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -156,13 +156,11 @@ void DmlReadThread::operator()() break; } -#if 0 case WE_SVR_BATCH_INSERT_BINARY: { rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId); break; } -#endif case WE_SVR_GET_WRITTEN_LBIDS: { diff --git a/writeengine/shared/we_config.cpp b/writeengine/shared/we_config.cpp index e3a957a87..47c0f8644 100644 --- a/writeengine/shared/we_config.cpp +++ b/writeengine/shared/we_config.cpp @@ -188,7 +188,8 @@ void Config::checkReload() const std::string fastDeleteTemp = cf->getConfig("WriteEngine", "FastDelete"); - if (fastDeleteTemp.length() != 0 && boost::iequals(fastDeleteTemp, "true")) + if (fastDeleteTemp.length() != 0 && + (fastDeleteTemp == "y" || fastDeleteTemp == "Y")) { m_FastDelete = true; } diff --git a/writeengine/shared/we_type.h b/writeengine/shared/we_type.h index 7bd11f892..dca7c05db 100644 --- a/writeengine/shared/we_type.h +++ b/writeengine/shared/we_type.h @@ -436,6 +436,38 @@ struct JobColumn /** @brief Job Column Structure */ , fDefaultWideDecimal(0) { } + JobColumn(const std::string& colName_, OID mapOid_, const std::string& typeName_, + int width_, int definedWidth_, int compressionType_, int dctnryCompressionType_, + int64_t minIntSat_, uint64_t maxIntSat_, bool withDefault_, + unsigned long long defaultUInt_) + : colName(colName_) + , mapOid(mapOid_) + , dataType(execplan::CalpontSystemCatalog::INT) + , weType(WR_INT) + , typeName(typeName_) + , emptyVal(nullptr) + , width(width_) + , definedWidth(definedWidth_) + , dctnryWidth(0) + , precision(0) + , scale(0) + , fNotNull(false) + , fFldColRelation(BULK_FLDCOL_COLUMN_FIELD) + , colType(' ') + , compressionType(compressionType_) + , autoIncFlag(false) + , fMinIntSat(minIntSat_) + , fMaxIntSat(maxIntSat_) + , fMinDblSat(0) + , fMaxDblSat(0) + , fWithDefault(withDefault_) + , fDefaultInt(0) + , fDefaultUInt(defaultUInt_) + , fDefaultDbl(0.0) + , fDefaultWideDecimal(0) + { + dctnry.fCompressionType = dctnryCompressionType_; + } }; typedef std::vector JobColList; /** @brief column value list */ diff --git a/writeengine/wrapper/we_colop.cpp b/writeengine/wrapper/we_colop.cpp index fc3a22a2b..d680fdd08 100644 --- a/writeengine/wrapper/we_colop.cpp +++ b/writeengine/wrapper/we_colop.cpp @@ -1412,12 +1412,12 @@ int ColumnOp::openColumnFile(Column& column, std::string& segFile, bool useTmpSu if (!isValid(column)) return ERR_INVALID_PARAM; - std::string mode; + std::string mode = "r"; - if (isReadOnly) - mode = "r"; - else + if (!isReadOnly) + { mode = "r+b"; + } // open column data file column.dataFile.pFile = @@ -1817,10 +1817,9 @@ int ColumnOp::writeRowsReadOnly(Column& curCol, uint64_t totalRow, const RIDList uint64_t i = 0, curRowId; int dataFbo, dataBio, curDataFbo = -1; unsigned char dataBuf[BYTE_PER_BLOCK]; - bool bExit = false; int rc = NO_ERROR; - while (!bExit) + while (i < totalRow) { curRowId = ridList[i]; @@ -1845,9 +1844,6 @@ int ColumnOp::writeRowsReadOnly(Column& curCol, uint64_t totalRow, const RIDList } i++; - - if (i >= totalRow) - bExit = true; } return rc; diff --git a/writeengine/xml/we_xmljob.h b/writeengine/xml/we_xmljob.h index 0910bea0b..d212d0a98 100644 --- a/writeengine/xml/we_xmljob.h +++ b/writeengine/xml/we_xmljob.h @@ -84,6 +84,14 @@ class XMLJob : public XMLOp return fJob; } + /** + * @brief Get reference to job structure + */ + Job& getJob() + { + return fJob; + } + /** * @brief Load job information * @param fileName Name of Job XML file to be read