1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4876 This patch enables continues buffer to be used by ColumnCommand and aligns BPP::blockData

that in most cases was unaligned
This commit is contained in:
Roman Nozdrin
2021-09-28 08:50:12 +00:00
parent c376472209
commit 3de038c1da
10 changed files with 297 additions and 511 deletions

View File

@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc. /* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-2021 MariaDB Corporation
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -30,11 +31,7 @@
#include "calpontsystemcatalog.h" #include "calpontsystemcatalog.h"
#include "joblisttypes.h" #include "joblisttypes.h"
#ifdef __cplusplus
#include <vector> #include <vector>
extern "C"
{
#endif
#pragma pack(push,1) #pragma pack(push,1)
@ -66,6 +63,8 @@ const int8_t COMPARE_NLIKE = (COMPARE_LIKE | COMPARE_NOT); //0x18
namespace primitives namespace primitives
{ {
using RIDType = uint16_t;
using NVALSType = uint16_t;
using utils::ConstString; using utils::ConstString;
class StringComparator: public datatypes::Charset class StringComparator: public datatypes::Charset
@ -288,6 +287,7 @@ struct VBCPacketHeader
// Packet Header for ISM SubBlock EU // Packet Header for ISM SubBlock EU
// Changing this structure one !MUST! align ColResultHeader
struct ISMPacketHeader struct ISMPacketHeader
{ {
ISMPacketHeader(): Interleave(0), Flags(0), Command(0), Size(0), Type(0), MsgCount(0), Status(0) {} ISMPacketHeader(): Interleave(0), Flags(0), Command(0), Size(0), Type(0), MsgCount(0), Status(0) {}
@ -303,6 +303,7 @@ struct ISMPacketHeader
// Primitive request/response structure Header // Primitive request/response structure Header
//@Bug 2744 changed all variables to 32 bit, and took out StatementID //@Bug 2744 changed all variables to 32 bit, and took out StatementID
// Changing this structure one !MUST! align ColResultHeader
struct PrimitiveHeader struct PrimitiveHeader
{ {
uint32_t SessionID; // Front end Session Identifier uint32_t SessionID; // Front end Session Identifier
@ -459,19 +460,6 @@ struct LoopbackResultHeader
// Column Results // Column Results
struct ColResultHeader
{
PrimitiveHeader Hdr;
uint64_t LBID;
uint16_t RidFlags;
uint16_t NVALS;
uint16_t ValidMinMax; // 1 if Min/Max are valid, otherwise 0
uint32_t OutputType;
int64_t Min; // Minimum value in this block (signed)
int64_t Max; // Maximum value in this block (signed)
uint32_t CacheIO; // I/O count from buffer cache
uint32_t PhysicalIO; // Physical I/O count from disk
};
// Column Aggregate results // Column Aggregate results
@ -805,24 +793,53 @@ struct NewColAggRequestHeader
NewColAggRequestHeader(); // QQ: not used NewColAggRequestHeader(); // QQ: not used
}; };
struct NewColResultHeader // The size of the structure !MUST! be aligned by the max(sizeof(DataType)) supported by MCS.
struct ColResultHeader
{ {
ISMPacketHeader ism;
PrimitiveHeader hdr; PrimitiveHeader hdr;
uint64_t LBID;
uint16_t RidFlags;
uint16_t NVALS;
uint16_t ValidMinMax; // 1 if Min/Max are valid, otherwise 0
uint32_t OutputType;
int128_t Min; // Minimum value in this block for signed data types int128_t Min; // Minimum value in this block for signed data types
int128_t Max; // Maximum value in this block for signed data types int128_t Max; // Maximum value in this block for signed data types
ISMPacketHeader ism;
uint64_t LBID;
uint16_t RidFlags;
primitives::NVALSType NVALS;
uint16_t ValidMinMax; // 1 if Min/Max are valid, otherwise 0
uint32_t OutputType;
uint32_t CacheIO; // I/O count from buffer cache uint32_t CacheIO; // I/O count from buffer cache
uint32_t PhysicalIO; // Physical I/O count from disk uint32_t PhysicalIO; // Physical I/O count from disk
char padding[34];
// if OutputType was OT_DATAVALUE, what follows is DataType[NVALS] // if OutputType was OT_DATAVALUE, what follows is DataType[NVALS]
// if OutputType was OT_RID, what follows is uint16_t Rids[NVALS] // if OutputType was OT_RID, what follows is uint16_t Rids[NVALS]
// if OutputType was OT_BOTH, what follows is NVALS <Rid, DataType> pairs // if OutputType was OT_BOTH, what follows is uint16_t Rids[NVALS] DataType[NVALS]
}; };
namespace primitives
{
constexpr static uint32_t ColResultHeaderFirstValueOffset = sizeof(ColResultHeader) + sizeof(RIDType) * BLOCK_SIZE;
constexpr static uint32_t RID2FirstValueOffset = sizeof(RIDType) * BLOCK_SIZE;
template<typename T>
inline T* getValuesArrayPosition(uint8_t* out, const NVALSType offset)
{
return reinterpret_cast<T*>(out + offset * sizeof(T));
}
inline primitives::RIDType* getRIDArrayPosition(uint8_t* out, const NVALSType offset)
{
return getValuesArrayPosition<NVALSType>(out, offset);
}
inline uint8_t* getFirstValueArrayPosition(ColResultHeader* outMsg)
{
return reinterpret_cast<uint8_t*>(outMsg) + ColResultHeaderFirstValueOffset;
}
inline uint8_t* getFirstRIDArrayPosition(ColResultHeader* outMsg)
{
return reinterpret_cast<uint8_t*>(&outMsg[1]);
}
}
/* additional types to support p_dictionary */ /* additional types to support p_dictionary */
struct DictFilterElement struct DictFilterElement
{ {
@ -891,10 +908,6 @@ struct LbidAtVer
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus
}
#endif
#endif //JOBLIST_PRIMITIVE_H #endif //JOBLIST_PRIMITIVE_H
// vim:ts=4 sw=4: // vim:ts=4 sw=4:

View File

@ -926,240 +926,32 @@ inline bool nextColValue(
/// WRITE COLUMN VALUES /// WRITE COLUMN VALUES
/// ///
// Append value to the output buffer with debug-time check for buffer overflow
template<typename T>
inline void checkedWriteValue(
void* out,
unsigned outSize,
unsigned* outPos,
const T* src,
int errSubtype)
{
#ifdef PRIM_DEBUG
if (sizeof(T) > outSize - *outPos)
{
logIt(35, errSubtype);
throw logic_error("PrimitiveProcessor::checkedWriteValue(): output buffer is too small");
}
#endif
uint8_t* out8 = reinterpret_cast<uint8_t*>(out);
memcpy(out8 + *outPos, src, sizeof(T));
*outPos += sizeof(T);
}
// Write the value index in srcArray and/or the value itself, depending on bits in OutputType, // Write the value index in srcArray and/or the value itself, depending on bits in OutputType,
// into the output buffer and update the output pointer. // into the output buffer and update the output pointer.
// TODO Introduce another dispatching layer based on OutputType.
template<typename T> template<typename T>
inline void writeColValue( inline void writeColValue(
uint8_t OutputType, uint8_t OutputType,
NewColResultHeader* out, ColResultHeader* out,
unsigned outSize,
unsigned* written,
uint16_t rid, uint16_t rid,
const T* srcArray) const T* srcArray)
{ {
uint8_t* outPtr = reinterpret_cast<uint8_t*>(&out[1]);
auto idx = out->NVALS++;
if (OutputType & OT_RID) if (OutputType & OT_RID)
{ {
checkedWriteValue(out, outSize, written, &rid, 1); auto* outPos = getRIDArrayPosition(outPtr, idx);
*outPos = rid;
out->RidFlags |= (1 << (rid >> 9)); // set the (row/512)'th bit out->RidFlags |= (1 << (rid >> 9)); // set the (row/512)'th bit
} }
if (OutputType & (OT_TOKEN | OT_DATAVALUE)) if (OutputType & (OT_TOKEN | OT_DATAVALUE))
{ {
checkedWriteValue(out, outSize, written, &srcArray[rid], 2); T* outPos = getValuesArrayPosition<T>(primitives::getFirstValueArrayPosition(out), idx);
} // TODO check bytecode for the 16 byte type
*outPos = srcArray[rid];
out->NVALS++; //TODO: Can be computed at the end from *written value
}
/* WIP
template <bool WRITE_RID, bool WRITE_DATA, bool IS_NULL_VALUE_MATCHES, typename FILTER_ARRAY_T, typename RID_T, typename T>
void writeArray(
size_t dataSize,
const T* dataArray,
const RID_T* dataRid,
const FILTER_ARRAY_T *filterArray,
uint8_t* outbuf,
unsigned* written,
uint16_t* NVALS,
uint8_t* RidFlagsPtr,
T NULL_VALUE)
{
uint8_t* out = outbuf;
uint8_t RidFlags = *RidFlagsPtr;
for (size_t i = 0; i < dataSize; ++i)
{
//TODO: optimize handling of NULL values and flags by avoiding non-predictable jumps
if (dataArray[i]==NULL_VALUE? IS_NULL_VALUE_MATCHES : filterArray[i])
{
if (WRITE_RID)
{
copyValue(out, &dataRid[i], sizeof(RID_T));
out += sizeof(RID_T);
RidFlags |= (1 << (dataRid[i] >> 10)); // set the (row/1024)'th bit
}
if (WRITE_DATA)
{
copyValue(out, &dataArray[i], sizeof(T));
out += sizeof(T);
}
}
}
// Update number of written values, number of written bytes and out->RidFlags
int size1 = (WRITE_RID? sizeof(RID_T) : 0) + (WRITE_DATA? sizeof(T) : 0);
*NVALS += (out - outbuf) / size1;
*written += out - outbuf;
*RidFlagsPtr = RidFlags;
}
*/
/*****************************************************************************
*** RUN DATA THROUGH A COLUMN FILTER ****************************************
*****************************************************************************/
/* "Vertical" processing of the column filter:
1. load all data into temporary vector
2. process one filter element over entire vector before going to a next one
3. write records, that succesfully passed through the filter, to outbuf
*/
/*
template<typename T, ENUM_KIND KIND, typename VALTYPE>
void processArray(
// Source data
const T* srcArray,
size_t srcSize,
uint16_t* ridArray,
size_t ridSize, // Number of values in ridArray
// Filter description
int BOP,
prestored_set_t* filterSet, // Set of values for simple filters (any of values / none of them)
uint32_t filterCount, // Number of filter elements, each described by one entry in the following arrays:
uint8_t* filterCOPs, // comparison operation
int64_t* filterValues, // value to compare to
// Output buffer/stats
uint8_t* outbuf, // Pointer to the place for output data
unsigned* written, // Number of written bytes, that we need to update
uint16_t* NVALS, // Number of written values, that we need to update
uint8_t* RidFlagsPtr, // Pointer to out->RidFlags
// Processing parameters
bool WRITE_RID,
bool WRITE_DATA,
bool SKIP_EMPTY_VALUES,
T EMPTY_VALUE,
bool IS_NULL_VALUE_MATCHES,
T NULL_VALUE,
// Min/Max search
bool ValidMinMax,
VALTYPE* MinPtr,
VALTYPE* MaxPtr)
{
// Alloc temporary arrays
size_t inputSize = (ridArray? ridSize : srcSize);
// Temporary array with data to filter
std::vector<T> dataVec(inputSize);
auto dataArray = dataVec.data();
// Temporary array with RIDs of corresponding dataArray elements
std::vector<RID_T> dataRidVec(WRITE_RID? inputSize : 0);
auto dataRid = dataRidVec.data();
// Copy input data into temporary array, opt. storing RIDs, opt. skipping EMPTYs
size_t dataSize; // number of values copied into dataArray
if (ridArray != NULL)
{
SKIP_EMPTY_VALUES = true; // let findMinMaxArray() know that empty values will be skipped
dataSize = WRITE_RID? readArray<true, true,true>(srcArray, srcSize, dataArray, dataRid, ridArray, ridSize, EMPTY_VALUE)
: readArray<false,true,true>(srcArray, srcSize, dataArray, dataRid, ridArray, ridSize, EMPTY_VALUE);
}
else if (SKIP_EMPTY_VALUES)
{
dataSize = WRITE_RID? readArray<true, false,true>(srcArray, srcSize, dataArray, dataRid, ridArray, ridSize, EMPTY_VALUE)
: readArray<false,false,true>(srcArray, srcSize, dataArray, dataRid, ridArray, ridSize, EMPTY_VALUE);
}
else
{
dataSize = WRITE_RID? readArray<true, false,false>(srcArray, srcSize, dataArray, dataRid, ridArray, ridSize, EMPTY_VALUE)
: readArray<false,false,false>(srcArray, srcSize, dataArray, dataRid, ridArray, ridSize, EMPTY_VALUE);
}
// If required, find Min/Max values of the data
if (ValidMinMax)
{
SKIP_EMPTY_VALUES? findMinMaxArray<true> (dataSize, dataArray, MinPtr, MaxPtr, EMPTY_VALUE, NULL_VALUE)
: findMinMaxArray<false>(dataSize, dataArray, MinPtr, MaxPtr, EMPTY_VALUE, NULL_VALUE);
}
// Choose initial filterArray[i] value depending on the operation
bool initValue = false;
if (filterCount == 0) {initValue = true;}
else if (BOP_NONE == BOP) {initValue = false; BOP = BOP_OR;}
else if (BOP_OR == BOP) {initValue = false;}
else if (BOP_XOR == BOP) {initValue = false;}
else if (BOP_AND == BOP) {initValue = true;}
// Temporary array accumulating results of filtering for each record
std::vector<uint8_t> filterVec(dataSize, initValue);
auto filterArray = filterVec.data();
// Real type of column data, may be floating-point (used only for comparisons in the filtering)
using FLOAT_T = typename std::conditional<sizeof(T) == 8, double, float>::type;
using DATA_T = typename std::conditional<KIND_FLOAT == KIND, FLOAT_T, T>::type;
auto realDataArray = reinterpret_cast<DATA_T*>(dataArray);
// Evaluate column filter on elements of dataArray and store results into filterArray
if (filterSet != NULL && BOP == BOP_OR)
{
applySetFilter<BOP_OR>(dataSize, dataArray, filterSet, filterArray);
}
else if (filterSet != NULL && BOP == BOP_AND)
{
applySetFilter<BOP_AND>(dataSize, dataArray, filterSet, filterArray);
}
else
for (int i = 0; i < filterCount; ++i)
{
DATA_T cmp_value; // value for comparison, may be floating-point
copyValue(&cmp_value, &filterValues[i], sizeof(cmp_value));
switch(BOP)
{
case BOP_AND: applyFilterElement<BOP_AND>(filterCOPs[i], dataSize, realDataArray, cmp_value, filterArray); break;
case BOP_OR: applyFilterElement<BOP_OR> (filterCOPs[i], dataSize, realDataArray, cmp_value, filterArray); break;
case BOP_XOR: applyFilterElement<BOP_XOR>(filterCOPs[i], dataSize, realDataArray, cmp_value, filterArray); break;
default: idbassert(0);
}
}
}
// Copy filtered data and/or their RIDs into output buffer
if (WRITE_RID && WRITE_DATA)
{
IS_NULL_VALUE_MATCHES? writeArray<true,true,true> (dataSize, dataArray, dataRid, filterArray, outbuf, written, NVALS, RidFlagsPtr, NULL_VALUE)
: writeArray<true,true,false>(dataSize, dataArray, dataRid, filterArray, outbuf, written, NVALS, RidFlagsPtr, NULL_VALUE);
}
else if (WRITE_RID)
{
IS_NULL_VALUE_MATCHES? writeArray<true,false,true> (dataSize, dataArray, dataRid, filterArray, outbuf, written, NVALS, RidFlagsPtr, NULL_VALUE)
: writeArray<true,false,false>(dataSize, dataArray, dataRid, filterArray, outbuf, written, NVALS, RidFlagsPtr, NULL_VALUE);
}
else
{
IS_NULL_VALUE_MATCHES? writeArray<false,true,true> (dataSize, dataArray, dataRid, filterArray, outbuf, written, NVALS, RidFlagsPtr, NULL_VALUE)
: writeArray<false,true,false>(dataSize, dataArray, dataRid, filterArray, outbuf, written, NVALS, RidFlagsPtr, NULL_VALUE);
} }
} }
*/
// These two are templates update min/max values in the loop iterating the values in filterColumnData. // These two are templates update min/max values in the loop iterating the values in filterColumnData.
template<ENUM_KIND KIND, typename T, template<ENUM_KIND KIND, typename T,
@ -1194,9 +986,7 @@ inline void updateMinMax(T& Min, T& Max, T& curValue, NewColRequestHeader* in)
template<typename T, ENUM_KIND KIND> template<typename T, ENUM_KIND KIND>
void filterColumnData( void filterColumnData(
NewColRequestHeader* in, NewColRequestHeader* in,
NewColResultHeader* out, ColResultHeader* out,
unsigned outSize,
unsigned* written,
uint16_t* ridArray, uint16_t* ridArray,
const uint16_t ridSize, // Number of values in ridArray const uint16_t ridSize, // Number of values in ridArray
int* srcArray16, int* srcArray16,
@ -1284,7 +1074,7 @@ void filterColumnData(
{ {
// If NULL values match the filter, write curValue to the output buffer // If NULL values match the filter, write curValue to the output buffer
if (isNullValueMatches) if (isNullValueMatches)
writeColValue<T>(outputType, out, outSize, written, rid, srcArray); writeColValue<T>(outputType, out, rid, srcArray);
} }
else else
{ {
@ -1292,7 +1082,7 @@ void filterColumnData(
if (matchingColValue<KIND, COL_WIDTH, false>(curValue, columnFilterMode, filterSet, filterCount, if (matchingColValue<KIND, COL_WIDTH, false>(curValue, columnFilterMode, filterSet, filterCount,
filterCOPs, filterValues, filterRFs, in->colType, NULL_VALUE)) filterCOPs, filterValues, filterRFs, in->colType, NULL_VALUE))
{ {
writeColValue<T>(outputType, out, outSize, written, rid, srcArray); writeColValue<T>(outputType, out, rid, srcArray);
} }
// Update Min and Max if necessary. EMPTY/NULL values are processed in other branches. // Update Min and Max if necessary. EMPTY/NULL values are processed in other branches.
@ -1347,9 +1137,7 @@ template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int32_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) == sizeof(int32_t), T>::type* = nullptr>
#endif #endif
void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in,
NewColResultHeader* out, ColResultHeader* out)
unsigned outSize,
unsigned* written)
{ {
constexpr int W = sizeof(T); constexpr int W = sizeof(T);
auto dataType = (execplan::CalpontSystemCatalog::ColDataType) in->colType.DataType; auto dataType = (execplan::CalpontSystemCatalog::ColDataType) in->colType.DataType;
@ -1360,10 +1148,10 @@ void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in,
uint16_t* ridArray = in->getRIDArrayPtr(W); uint16_t* ridArray = in->getRIDArrayPtr(W);
const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE
: BLOCK_SIZE / W; : BLOCK_SIZE / W;
filterColumnData<T, KIND_FLOAT>(in, out, outSize, written, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); filterColumnData<T, KIND_FLOAT>(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter);
return; return;
} }
_scanAndFilterTypeDispatcher<T>(in, out, outSize, written); _scanAndFilterTypeDispatcher<T>(in, out);
} }
template<typename T, template<typename T,
@ -1377,9 +1165,7 @@ template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int64_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) == sizeof(int64_t), T>::type* = nullptr>
#endif #endif
void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in,
NewColResultHeader* out, ColResultHeader* out)
unsigned outSize,
unsigned* written)
{ {
constexpr int W = sizeof(T); constexpr int W = sizeof(T);
auto dataType = (execplan::CalpontSystemCatalog::ColDataType) in->colType.DataType; auto dataType = (execplan::CalpontSystemCatalog::ColDataType) in->colType.DataType;
@ -1389,10 +1175,10 @@ void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in,
uint16_t* ridArray = in->getRIDArrayPtr(W); uint16_t* ridArray = in->getRIDArrayPtr(W);
const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE
: BLOCK_SIZE / W; : BLOCK_SIZE / W;
filterColumnData<T, KIND_FLOAT>(in, out, outSize, written, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); filterColumnData<T, KIND_FLOAT>(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter);
return; return;
} }
_scanAndFilterTypeDispatcher<T>(in, out, outSize, written); _scanAndFilterTypeDispatcher<T>(in, out);
} }
template<typename T, template<typename T,
@ -1408,11 +1194,9 @@ template<typename T,
sizeof(T) == sizeof(int128_t), T>::type* = nullptr> sizeof(T) == sizeof(int128_t), T>::type* = nullptr>
#endif #endif
void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in, void PrimitiveProcessor::scanAndFilterTypeDispatcher(NewColRequestHeader* in,
NewColResultHeader* out, ColResultHeader* out)
unsigned outSize,
unsigned* written)
{ {
_scanAndFilterTypeDispatcher<T>(in, out, outSize, written); _scanAndFilterTypeDispatcher<T>(in, out);
} }
template<typename T, template<typename T,
@ -1426,9 +1210,7 @@ template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int128_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) == sizeof(int128_t), T>::type* = nullptr>
#endif #endif
void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in,
NewColResultHeader* out, ColResultHeader* out)
unsigned outSize,
unsigned* written)
{ {
constexpr int W = sizeof(T); constexpr int W = sizeof(T);
const uint16_t ridSize = in->NVALS; const uint16_t ridSize = in->NVALS;
@ -1436,7 +1218,7 @@ void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in,
const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE const uint32_t itemsPerBlock = logicalBlockMode ? BLOCK_SIZE
: BLOCK_SIZE / W; : BLOCK_SIZE / W;
filterColumnData<T, KIND_DEFAULT>(in, out, outSize, written, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); filterColumnData<T, KIND_DEFAULT>(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter);
} }
template<typename T, template<typename T,
@ -1450,9 +1232,7 @@ template<typename T,
typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr>
#endif #endif
void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in, void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in,
NewColResultHeader* out, ColResultHeader* out)
unsigned outSize,
unsigned* written)
{ {
constexpr int W = sizeof(T); constexpr int W = sizeof(T);
const uint16_t ridSize = in->NVALS; const uint16_t ridSize = in->NVALS;
@ -1466,24 +1246,23 @@ void PrimitiveProcessor::_scanAndFilterTypeDispatcher(NewColRequestHeader* in,
dataType == execplan::CalpontSystemCatalog::TEXT) && dataType == execplan::CalpontSystemCatalog::TEXT) &&
!isDictTokenScan(in)) !isDictTokenScan(in))
{ {
filterColumnData<T, KIND_TEXT>(in, out, outSize, written, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); filterColumnData<T, KIND_TEXT>(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter);
return; return;
} }
if (datatypes::isUnsigned(dataType)) if (datatypes::isUnsigned(dataType))
{ {
using UT = typename std::conditional<std::is_unsigned<T>::value || datatypes::is_uint128_t<T>::value, T, typename datatypes::make_unsigned<T>::type>::type; using UT = typename std::conditional<std::is_unsigned<T>::value || datatypes::is_uint128_t<T>::value, T, typename datatypes::make_unsigned<T>::type>::type;
filterColumnData<UT, KIND_UNSIGNED>(in, out, outSize, written, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); filterColumnData<UT, KIND_UNSIGNED>(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter);
return; return;
} }
filterColumnData<T, KIND_DEFAULT>(in, out, outSize, written, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter); filterColumnData<T, KIND_DEFAULT>(in, out, ridArray, ridSize, block, itemsPerBlock, parsedColumnFilter);
} }
// The entrypoint for block scanning and filtering. // The entrypoint for block scanning and filtering.
// The block is in in msg, out msg is used to store values|RIDs matched. // The block is in in msg, out msg is used to store values|RIDs matched.
template<typename T> template<typename T>
void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, NewColResultHeader* out, void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out)
unsigned outSize, unsigned* written)
{ {
#ifdef PRIM_DEBUG #ifdef PRIM_DEBUG
auto markEvent = [&] (char eventChar) auto markEvent = [&] (char eventChar)
@ -1501,7 +1280,6 @@ void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, NewColResu
out->ism.Command = COL_RESULTS; out->ism.Command = COL_RESULTS;
out->OutputType = in->OutputType; out->OutputType = in->OutputType;
out->RidFlags = 0; out->RidFlags = 0;
*written = sizeof(NewColResultHeader);
//...Initialize I/O counts; //...Initialize I/O counts;
out->CacheIO = 0; out->CacheIO = 0;
out->PhysicalIO = 0; out->PhysicalIO = 0;
@ -1523,22 +1301,22 @@ void PrimitiveProcessor::columnScanAndFilter(NewColRequestHeader* in, NewColResu
// Sort ridArray (the row index array) if there are RIDs with this in msg // Sort ridArray (the row index array) if there are RIDs with this in msg
in->sortRIDArrayIfNeeded(W); in->sortRIDArrayIfNeeded(W);
scanAndFilterTypeDispatcher<T>(in, out, outSize, written); scanAndFilterTypeDispatcher<T>(in, out);
#ifdef PRIM_DEBUG #ifdef PRIM_DEBUG
markEvent('C'); markEvent('C');
#endif #endif
} }
template template
void primitives::PrimitiveProcessor::columnScanAndFilter<int8_t>(NewColRequestHeader*, NewColResultHeader*, unsigned, unsigned*); void primitives::PrimitiveProcessor::columnScanAndFilter<int8_t>(NewColRequestHeader*, ColResultHeader*);
template template
void primitives::PrimitiveProcessor::columnScanAndFilter<int16_t>(NewColRequestHeader*, NewColResultHeader*, unsigned int, unsigned int*); void primitives::PrimitiveProcessor::columnScanAndFilter<int16_t>(NewColRequestHeader*, ColResultHeader*);
template template
void primitives::PrimitiveProcessor::columnScanAndFilter<int32_t>(NewColRequestHeader*, NewColResultHeader*, unsigned int, unsigned int*); void primitives::PrimitiveProcessor::columnScanAndFilter<int32_t>(NewColRequestHeader*, ColResultHeader*);
template template
void primitives::PrimitiveProcessor::columnScanAndFilter<int64_t>(NewColRequestHeader*, NewColResultHeader*, unsigned int, unsigned int*); void primitives::PrimitiveProcessor::columnScanAndFilter<int64_t>(NewColRequestHeader*, ColResultHeader*);
template template
void primitives::PrimitiveProcessor::columnScanAndFilter<int128_t>(NewColRequestHeader*, NewColResultHeader*, unsigned int, unsigned int*); void primitives::PrimitiveProcessor::columnScanAndFilter<int128_t>(NewColRequestHeader*, ColResultHeader*);
} // namespace primitives } // namespace primitives
// vim:ts=4 sw=4: // vim:ts=4 sw=4:

View File

@ -2472,7 +2472,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
uint8_t* results; uint8_t* results;
int fd; int fd;
uint32_t i, written; uint32_t i, written;
@ -2506,7 +2506,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 1; in->DataSize = 1;
in->DataType = CalpontSystemCatalog::CHAR; in->DataType = CalpontSystemCatalog::CHAR;
@ -2518,7 +2518,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = &output[sizeof(NewColResultHeader)]; results = &output[sizeof(ColResultHeader)];
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 8160); CPPUNIT_ASSERT(out->NVALS == 8160);
@ -2536,7 +2536,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
uint16_t* results; uint16_t* results;
uint32_t written, i; uint32_t written, i;
int fd; int fd;
@ -2570,7 +2570,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 2; in->DataSize = 2;
in->DataType = CalpontSystemCatalog::INT; in->DataType = CalpontSystemCatalog::INT;
@ -2582,7 +2582,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<uint16_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<uint16_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 4096); CPPUNIT_ASSERT(out->NVALS == 4096);
@ -2600,7 +2600,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
uint32_t* results; uint32_t* results;
uint32_t written, i; uint32_t written, i;
int fd; int fd;
@ -2634,7 +2634,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 4; in->DataSize = 4;
in->DataType = CalpontSystemCatalog::INT; in->DataType = CalpontSystemCatalog::INT;
@ -2646,7 +2646,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<uint32_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<uint32_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 2048); CPPUNIT_ASSERT(out->NVALS == 2048);
@ -2664,7 +2664,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
u_int64_t* results; u_int64_t* results;
uint32_t written, i; uint32_t written, i;
int fd; int fd;
@ -2698,7 +2698,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 8; in->DataSize = 8;
in->DataType = CalpontSystemCatalog::INT; in->DataType = CalpontSystemCatalog::INT;
@ -2710,7 +2710,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<u_int64_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 1024); CPPUNIT_ASSERT(out->NVALS == 1024);
@ -2728,7 +2728,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
uint8_t* results; uint8_t* results;
uint16_t* rids; uint16_t* rids;
uint32_t written, i; uint32_t written, i;
@ -2763,7 +2763,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
rids = reinterpret_cast<uint16_t*>(&in[1]); rids = reinterpret_cast<uint16_t*>(&in[1]);
in->DataSize = 1; in->DataSize = 1;
@ -2778,7 +2778,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<uint8_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<uint8_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 2); CPPUNIT_ASSERT(out->NVALS == 2);
@ -2796,7 +2796,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
uint32_t* results; uint32_t* results;
uint32_t written, i; uint32_t written, i;
@ -2831,7 +2831,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&in[1]); args = reinterpret_cast<ColArgs*>(&in[1]);
in->DataSize = 4; in->DataSize = 4;
@ -2854,7 +2854,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<uint32_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<uint32_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 9); CPPUNIT_ASSERT(out->NVALS == 9);
@ -2872,7 +2872,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
u_int64_t* results; u_int64_t* results;
uint32_t written, i; uint32_t written, i;
@ -2908,7 +2908,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = 8; in->DataSize = 8;
@ -2931,7 +2931,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<u_int64_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 33); CPPUNIT_ASSERT(out->NVALS == 33);
@ -2949,7 +2949,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
u_int64_t* results; u_int64_t* results;
uint32_t written, i; uint32_t written, i;
@ -2985,7 +2985,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = 8; in->DataSize = 8;
@ -3008,7 +3008,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<u_int64_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 2); CPPUNIT_ASSERT(out->NVALS == 2);
CPPUNIT_ASSERT(results[0] == 10); CPPUNIT_ASSERT(results[0] == 10);
@ -3026,7 +3026,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
uint16_t* rids; uint16_t* rids;
u_int64_t* results; u_int64_t* results;
@ -3063,7 +3063,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = 8; in->DataSize = 8;
@ -3092,7 +3092,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<u_int64_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 1); CPPUNIT_ASSERT(out->NVALS == 1);
CPPUNIT_ASSERT(results[0] == 10); CPPUNIT_ASSERT(results[0] == 10);
@ -3109,7 +3109,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
int16_t* results; int16_t* results;
uint32_t written, i; uint32_t written, i;
@ -3145,7 +3145,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = 8; in->DataSize = 8;
@ -3168,7 +3168,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<int16_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<int16_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 33); CPPUNIT_ASSERT(out->NVALS == 33);
@ -3186,7 +3186,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
int16_t* resultRid; int16_t* resultRid;
int64_t* resultVal; int64_t* resultVal;
@ -3223,7 +3223,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = 8; in->DataSize = 8;
@ -3252,7 +3252,7 @@ public:
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
{ {
resultRid = reinterpret_cast<int16_t*>(&output[ resultRid = reinterpret_cast<int16_t*>(&output[
sizeof(NewColResultHeader) + i * (sizeof(Int16) + in->DataSize)]); sizeof(ColResultHeader) + i * (sizeof(Int16) + in->DataSize)]);
resultVal = reinterpret_cast<int64_t*>(&resultRid[1]); resultVal = reinterpret_cast<int64_t*>(&resultRid[1]);
// cout << i << ": rid:" << (int) *resultRid << " val:" << *resultVal << endl; // cout << i << ": rid:" << (int) *resultRid << " val:" << *resultVal << endl;
@ -3268,7 +3268,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
uint8_t* results; uint8_t* results;
uint32_t written, i; uint32_t written, i;
@ -3303,7 +3303,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 1; in->DataSize = 1;
in->DataType = CalpontSystemCatalog::CHAR; in->DataType = CalpontSystemCatalog::CHAR;
@ -3326,7 +3326,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = &output[sizeof(NewColResultHeader)]; results = &output[sizeof(ColResultHeader)];
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 32); CPPUNIT_ASSERT(out->NVALS == 32);
@ -3346,7 +3346,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
int16_t* results; int16_t* results;
uint32_t written, i; uint32_t written, i;
@ -3383,7 +3383,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = 4; in->DataSize = 4;
@ -3420,7 +3420,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<int16_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<int16_t*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 2); CPPUNIT_ASSERT(out->NVALS == 2);
CPPUNIT_ASSERT(results[0] == 8); CPPUNIT_ASSERT(results[0] == 8);
@ -3438,7 +3438,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
double* results; double* results;
uint32_t written, i; uint32_t written, i;
@ -3474,7 +3474,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 8; in->DataSize = 8;
in->DataType = CalpontSystemCatalog::DOUBLE; in->DataType = CalpontSystemCatalog::DOUBLE;
@ -3496,7 +3496,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<double*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<double*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 8); CPPUNIT_ASSERT(out->NVALS == 8);
@ -3514,7 +3514,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
float* resultVal; float* resultVal;
uint32_t written, i; uint32_t written, i;
@ -3551,7 +3551,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 4; in->DataSize = 4;
in->DataType = CalpontSystemCatalog::FLOAT; in->DataType = CalpontSystemCatalog::FLOAT;
@ -3579,7 +3579,7 @@ public:
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
{ {
resultRid = reinterpret_cast<int16_t*>(&output[ resultRid = reinterpret_cast<int16_t*>(&output[
sizeof(NewColResultHeader) + i * (sizeof(Int16) + in->DataSize)]); sizeof(ColResultHeader) + i * (sizeof(Int16) + in->DataSize)]);
resultVal = reinterpret_cast<float*>(&resultRid[1]); resultVal = reinterpret_cast<float*>(&resultRid[1]);
// cout << i << ": rid:" << (int) *resultRid << " val:" << *resultVal << endl; // cout << i << ": rid:" << (int) *resultRid << " val:" << *resultVal << endl;
@ -3595,7 +3595,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
float* resultVal; float* resultVal;
int16_t* resultRid; int16_t* resultRid;
@ -3632,7 +3632,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 4; in->DataSize = 4;
in->DataType = CalpontSystemCatalog::FLOAT; in->DataType = CalpontSystemCatalog::FLOAT;
@ -3661,7 +3661,7 @@ public:
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
{ {
resultRid = reinterpret_cast<int16_t*>(&output[ resultRid = reinterpret_cast<int16_t*>(&output[
sizeof(NewColResultHeader) + i * (sizeof(Int16) + in->DataSize)]); sizeof(ColResultHeader) + i * (sizeof(Int16) + in->DataSize)]);
resultVal = reinterpret_cast<float*>(&resultRid[1]); resultVal = reinterpret_cast<float*>(&resultRid[1]);
// cout << i << ": rid:" << (int) *resultRid << " val:" << *resultVal << endl; // cout << i << ": rid:" << (int) *resultRid << " val:" << *resultVal << endl;
@ -3677,7 +3677,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
double* results; double* results;
uint32_t written, i; uint32_t written, i;
@ -3713,7 +3713,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
in->DataSize = 8; in->DataSize = 8;
in->DataType = CalpontSystemCatalog::DOUBLE; in->DataType = CalpontSystemCatalog::DOUBLE;
@ -3735,7 +3735,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<double*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<double*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT(out->NVALS == 19); CPPUNIT_ASSERT(out->NVALS == 19);
@ -3764,7 +3764,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE], output[4 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
binary16* results; binary16* results;
uint32_t written, i; uint32_t written, i;
@ -3797,7 +3797,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = sizeof(binary16); in->DataSize = sizeof(binary16);
@ -3825,7 +3825,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<binary16*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<binary16*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT_EQUAL((uint16_t)3, out->NVALS); CPPUNIT_ASSERT_EQUAL((uint16_t)3, out->NVALS);
CPPUNIT_ASSERT_EQUAL((u_int64_t)10, results[0].uint64(0)); CPPUNIT_ASSERT_EQUAL((u_int64_t)10, results[0].uint64(0));
@ -3841,7 +3841,7 @@ public:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[2 * BLOCK_SIZE], output[8 * BLOCK_SIZE], block[BLOCK_SIZE]; uint8_t input[2 * BLOCK_SIZE], output[8 * BLOCK_SIZE], block[BLOCK_SIZE];
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
binary32* results; binary32* results;
uint32_t written, i; uint32_t written, i;
@ -3876,7 +3876,7 @@ public:
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]); args = reinterpret_cast<ColArgs*>(&input[sizeof(NewColRequestHeader)]);
in->DataSize = sizeof(binary32); in->DataSize = sizeof(binary32);
@ -3907,7 +3907,7 @@ public:
pp.setBlockPtr((int*) block); pp.setBlockPtr((int*) block);
pp.p_Col(in, out, 4 * BLOCK_SIZE, &written); pp.p_Col(in, out, 4 * BLOCK_SIZE, &written);
results = reinterpret_cast<binary32*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<binary32*>(&output[sizeof(ColResultHeader)]);
// cout << "NVALS = " << out->NVALS << endl; // cout << "NVALS = " << out->NVALS << endl;
CPPUNIT_ASSERT_EQUAL((uint16_t)3, out->NVALS); CPPUNIT_ASSERT_EQUAL((uint16_t)3, out->NVALS);
// CPPUNIT_ASSERT_EQUAL((u_int64_t)10, results[0].uint64(0)); // CPPUNIT_ASSERT_EQUAL((u_int64_t)10, results[0].uint64(0));

View File

@ -376,7 +376,7 @@ public:
* an array of 'NOPS' defining the filter to apply (optional), * an array of 'NOPS' defining the filter to apply (optional),
* followed by an array of RIDs to apply the filter to (optional). * followed by an array of RIDs to apply the filter to (optional).
* @param out The buffer that will contain the results. On return, it will start with * @param out The buffer that will contain the results. On return, it will start with
* a NewColResultHeader, followed by the output type specified by in->OutputType. * a ColResultHeader, followed by the output type specified by in->OutputType.
* \li If OT_RID, it will be an array of RIDs * \li If OT_RID, it will be an array of RIDs
* \li If OT_DATAVALUE, it will be an array of matching data values stored in the column * \li If OT_DATAVALUE, it will be an array of matching data values stored in the column
* \li If OT_BOTH, it will be an array of <RID, DataValue> pairs * \li If OT_BOTH, it will be an array of <RID, DataValue> pairs
@ -385,38 +385,32 @@ public:
* number of bytes written to out. * number of bytes written to out.
* @note See PrimitiveMsg.h for the type definitions. * @note See PrimitiveMsg.h for the type definitions.
*/ */
void p_Col(NewColRequestHeader* in, NewColResultHeader* out, unsigned outSize, void p_Col(NewColRequestHeader* in, ColResultHeader* out, unsigned outSize,
unsigned* written); unsigned* written);
template<typename T, template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int8_t) || typename std::enable_if<sizeof(T) == sizeof(int8_t) ||
sizeof(T) == sizeof(int16_t) || sizeof(T) == sizeof(int16_t) ||
sizeof(T) == sizeof(int128_t), T>::type* = nullptr> sizeof(T) == sizeof(int128_t), T>::type* = nullptr>
void scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out, void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out);
unsigned outSize, unsigned* written);
template<typename T, template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int32_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) == sizeof(int32_t), T>::type* = nullptr>
void scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out, void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out);
unsigned outSize, unsigned* written);
template<typename T, template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int64_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) == sizeof(int64_t), T>::type* = nullptr>
void scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out, void scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out);
unsigned outSize, unsigned* written);
template<typename T, template<typename T,
typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) <= sizeof(int64_t), T>::type* = nullptr>
void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out, void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out);
unsigned outSize, unsigned* written);
template<typename T, template<typename T,
typename std::enable_if<sizeof(T) == sizeof(int128_t), T>::type* = nullptr> typename std::enable_if<sizeof(T) == sizeof(int128_t), T>::type* = nullptr>
void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, NewColResultHeader* out, void _scanAndFilterTypeDispatcher(NewColRequestHeader* in, ColResultHeader* out);
unsigned outSize, unsigned* written);
template<typename T> template<typename T>
void columnScanAndFilter(NewColRequestHeader* in, NewColResultHeader* out, void columnScanAndFilter(NewColRequestHeader* in, ColResultHeader* out);
unsigned outSize, unsigned* written);
boost::shared_ptr<ParsedColumnFilter> parseColumnFilter(const uint8_t* filterString, boost::shared_ptr<ParsedColumnFilter> parseColumnFilter(const uint8_t* filterString,
uint32_t colWidth, uint32_t colWidth,

View File

@ -37,6 +37,7 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <set> #include <set>
#include <stdlib.h>
using namespace std; using namespace std;
#include <boost/thread.hpp> #include <boost/thread.hpp>
@ -919,7 +920,7 @@ void BatchPrimitiveProcessor::initProcessor()
strValues.reset(new string[LOGICAL_BLOCK_RIDS]); strValues.reset(new string[LOGICAL_BLOCK_RIDS]);
outMsgSize = defaultBufferSize; outMsgSize = defaultBufferSize;
outputMsg.reset(new uint8_t[outMsgSize]); outputMsg.reset(reinterpret_cast<uint8_t*>(aligned_alloc(utils::MAXCOLUMNWIDTH, outMsgSize)));
if (ot == ROW_GROUP) if (ot == ROW_GROUP)
{ {

View File

@ -225,7 +225,7 @@ private:
uint16_t wideColumnsWidths; uint16_t wideColumnsWidths;
/* Common space for primitive data */ /* Common space for primitive data */
uint8_t blockData[BLOCK_SIZE * utils::MAXCOLUMNWIDTH]; alignas(utils::MAXCOLUMNWIDTH) uint8_t blockData[BLOCK_SIZE * utils::MAXCOLUMNWIDTH];
boost::scoped_array<uint8_t> outputMsg; boost::scoped_array<uint8_t> outputMsg;
uint32_t outMsgSize; uint32_t outMsgSize;

View File

@ -310,8 +310,9 @@ template<int W>
void ColumnCommand::_issuePrimitive() void ColumnCommand::_issuePrimitive()
{ {
using IntegralType = typename datatypes::WidthToSIntegralType<W>::type; using IntegralType = typename datatypes::WidthToSIntegralType<W>::type;
uint32_t resultSize; // Down the call stack the code presumes outMsg buffer has enough space to store
bpp->getPrimitiveProcessor().columnScanAndFilter<IntegralType>(primMsg, outMsg, bpp->getOutMsgSize(), (unsigned int*)&resultSize); // ColRequestHeader + uint16_t Rids[8192] + IntegralType[8192].
bpp->getPrimitiveProcessor().columnScanAndFilter<IntegralType>(primMsg, outMsg);
} // _issuePrimitive() } // _issuePrimitive()
void ColumnCommand::updateCPDataNarrow() void ColumnCommand::updateCPDataNarrow()
@ -360,34 +361,33 @@ void ColumnCommand::_process_OT_BOTH_wAbsRids()
using T = typename datatypes::WidthToSIntegralType<W>::type; using T = typename datatypes::WidthToSIntegralType<W>::type;
bpp->ridCount = outMsg->NVALS; bpp->ridCount = outMsg->NVALS;
bpp->ridMap = outMsg->RidFlags; bpp->ridMap = outMsg->RidFlags;
size_t pos = sizeof(NewColResultHeader); uint8_t* outPtr = reinterpret_cast<uint8_t*>(&outMsg[1]);
auto* ridPos = primitives::getRIDArrayPosition(outPtr, 0);
T* valuesPos = primitives::getValuesArrayPosition<T>(primitives::getFirstValueArrayPosition(outMsg), 0);
for (size_t i = 0; i < outMsg->NVALS; ++i) for (size_t i = 0; i < outMsg->NVALS; ++i)
{ {
bpp->absRids[i] = *((uint16_t*) &bpp->outputMsg[pos]) + bpp->baseRid; bpp->relRids[i] = ridPos[i];
bpp->absRids[i] = ridPos[i] + bpp->baseRid;
bpp->relRids[i] = *((uint16_t*) &bpp->outputMsg[pos]); values[i] = valuesPos[i];
pos += 2;
values[i] = *((T*) &bpp->outputMsg[pos]);
pos += W;
} }
} }
template<> template<>
void ColumnCommand::_process_OT_BOTH_wAbsRids<16>() void ColumnCommand::_process_OT_BOTH_wAbsRids<16>()
{ {
using T = typename datatypes::WidthToSIntegralType<16>::type;
bpp->ridCount = outMsg->NVALS; bpp->ridCount = outMsg->NVALS;
bpp->ridMap = outMsg->RidFlags; bpp->ridMap = outMsg->RidFlags;
size_t pos = sizeof(NewColResultHeader); uint8_t* outPtr = reinterpret_cast<uint8_t*>(&outMsg[1]);
auto* ridPos = primitives::getRIDArrayPosition(outPtr, 0);
int128_t* valuesPos = primitives::getValuesArrayPosition<T>(primitives::getFirstValueArrayPosition(outMsg), 0);
for (size_t i = 0; i < outMsg->NVALS; ++i) for (size_t i = 0; i < outMsg->NVALS; ++i)
{ {
bpp->absRids[i] = *((uint16_t*) &bpp->outputMsg[pos]) + bpp->baseRid; bpp->relRids[i] = ridPos[i];
bpp->absRids[i] = ridPos[i] + bpp->baseRid;
bpp->relRids[i] = *((uint16_t*) &bpp->outputMsg[pos]); datatypes::TSInt128::assignPtrPtr(&wide128Values[i], &valuesPos[i]);
pos += 2;
datatypes::TSInt128::assignPtrPtr(&wide128Values[i], &bpp->outputMsg[pos]);
pos += 16;
} }
} }
@ -399,30 +399,31 @@ void ColumnCommand::_process_OT_BOTH()
bpp->ridCount = outMsg->NVALS; bpp->ridCount = outMsg->NVALS;
bpp->ridCount = outMsg->NVALS; bpp->ridCount = outMsg->NVALS;
bpp->ridMap = outMsg->RidFlags; bpp->ridMap = outMsg->RidFlags;
size_t pos = sizeof(NewColResultHeader); uint8_t* outPtr = reinterpret_cast<uint8_t*>(&outMsg[1]);
auto* ridPos = primitives::getRIDArrayPosition(outPtr, 0);
T* valuesPos = primitives::getValuesArrayPosition<T>(primitives::getFirstValueArrayPosition(outMsg), 0);
for (size_t i = 0; i < outMsg->NVALS; ++i) for (size_t i = 0; i < outMsg->NVALS; ++i)
{ {
bpp->relRids[i] = *((uint16_t*) &bpp->outputMsg[pos]); bpp->relRids[i] = ridPos[i];
pos += 2; values[i] = valuesPos[i];
values[i] = *((T*) &bpp->outputMsg[pos]);
pos += W;
} }
} }
template<> template<>
void ColumnCommand::_process_OT_BOTH<16>() void ColumnCommand::_process_OT_BOTH<16>()
{ {
using T = typename datatypes::WidthToSIntegralType<16>::type;
bpp->ridCount = outMsg->NVALS; bpp->ridCount = outMsg->NVALS;
bpp->ridMap = outMsg->RidFlags; bpp->ridMap = outMsg->RidFlags;
size_t pos = sizeof(NewColResultHeader); uint8_t* outPtr = reinterpret_cast<uint8_t*>(&outMsg[1]);
auto* ridPos = primitives::getRIDArrayPosition(outPtr, 0);
T* valuesPos = primitives::getValuesArrayPosition<T>(primitives::getFirstValueArrayPosition(outMsg), 0);
for (size_t i = 0; i < outMsg->NVALS; ++i) for (size_t i = 0; i < outMsg->NVALS; ++i)
{ {
bpp->relRids[i] = *((uint16_t*) &bpp->outputMsg[pos]); bpp->relRids[i] = ridPos[i];
pos += 2; datatypes::TSInt128::assignPtrPtr(&wide128Values[i], &valuesPos[i]);
datatypes::TSInt128::assignPtrPtr(&wide128Values[i], &bpp->outputMsg[pos]);
pos += 16;
} }
} }
@ -487,6 +488,8 @@ void ColumnCommand::process_OT_RID()
bpp->ridMap = outMsg->RidFlags; bpp->ridMap = outMsg->RidFlags;
} }
// TODO This spec makes an impicit type conversion to fit types with sizeof(type) <= 4
// into 8 bytes. Treat values as a buffer and use memcpy to store the values in one go.
template<int W> template<int W>
void ColumnCommand::_process_OT_DATAVALUE() void ColumnCommand::_process_OT_DATAVALUE()
{ {
@ -689,18 +692,17 @@ void ColumnCommand::prep(int8_t outputType, bool absRids)
void ColumnCommand::fillInPrimitiveMessageHeader(const int8_t outputType, const bool absRids) void ColumnCommand::fillInPrimitiveMessageHeader(const int8_t outputType, const bool absRids)
{ {
// WIP Align this structure or move input RIDs away.
baseMsgLength = sizeof(NewColRequestHeader) + baseMsgLength = sizeof(NewColRequestHeader) +
(suppressFilter ? 0 : filterString.length()); (suppressFilter ? 0 : filterString.length());
size_t inputMsgBufSize = baseMsgLength + (LOGICAL_BLOCK_RIDS * sizeof(primitives::RIDType));
if (!inputMsg) if (!inputMsg)
inputMsg.reset(new uint8_t[baseMsgLength + (LOGICAL_BLOCK_RIDS * 2)]); inputMsg.reset(reinterpret_cast<uint8_t*>(aligned_alloc(utils::MAXCOLUMNWIDTH,
inputMsgBufSize)));
primMsg = (NewColRequestHeader*) inputMsg.get(); primMsg = (NewColRequestHeader*) inputMsg.get();
outMsg = (NewColResultHeader*) bpp->outputMsg.get(); outMsg = (ColResultHeader*) bpp->outputMsg.get();
makeAbsRids = absRids;
primMsg = (NewColRequestHeader*) inputMsg.get();
outMsg = (NewColResultHeader*) bpp->outputMsg.get();
makeAbsRids = absRids; makeAbsRids = absRids;
primMsg->ism.Interleave = 0; primMsg->ism.Interleave = 0;
primMsg->ism.Flags = 0; primMsg->ism.Flags = 0;
@ -721,7 +723,8 @@ void ColumnCommand::fillInPrimitiveMessageHeader(const int8_t outputType, const
/* Assumes OT_DATAVALUE */ /* Assumes OT_DATAVALUE */
void ColumnCommand::projectResult() void ColumnCommand::projectResult()
{ {
if (primMsg->NVALS != outMsg->NVALS || outMsg->NVALS != bpp->ridCount ) auto nvals = outMsg->NVALS;
if (primMsg->NVALS != nvals || nvals != bpp->ridCount )
{ {
ostringstream os; ostringstream os;
BRM::DBRM brm; BRM::DBRM brm;
@ -739,19 +742,20 @@ void ColumnCommand::projectResult()
else else
os << ": ridcount " << bpp->ridCount; os << ": ridcount " << bpp->ridCount;
os << ", output rids " << outMsg->NVALS << endl; os << ", output rids " << nvals << endl;
//cout << os.str(); //cout << os.str();
if (bpp->sessionID & 0x80000000) if (bpp->sessionID & 0x80000000)
throw NeedToRestartJob(os.str()); throw NeedToRestartJob(os.str());
else else
throw PrimitiveColumnProjectResultExcept(os.str()); throw PrimitiveColumnProjectResultExcept(os.str());
} }
idbassert(primMsg->NVALS == outMsg->NVALS); idbassert(primMsg->NVALS == nvals);
idbassert(outMsg->NVALS == bpp->ridCount); idbassert(bpp->ridCount == nvals);
*bpp->serialized << (uint32_t) (outMsg->NVALS * colType.colWidth); uint32_t valuesByteSize = nvals * colType.colWidth;
bpp->serialized->append((uint8_t*) (outMsg + 1), outMsg->NVALS * colType.colWidth); *bpp->serialized << valuesByteSize;
bpp->serialized->append(primitives::getFirstValueArrayPosition(outMsg), valuesByteSize);
} }
void ColumnCommand::removeRowsFromRowGroup(RowGroup& rg) void ColumnCommand::removeRowsFromRowGroup(RowGroup& rg)
@ -796,30 +800,26 @@ void ColumnCommand::removeRowsFromRowGroup(RowGroup& rg)
primMsg->NVALS = outMsg->NVALS; primMsg->NVALS = outMsg->NVALS;
} }
template<int W> template<typename T>
void ColumnCommand::_projectResultRGLoop(rowgroup::Row& r, void ColumnCommand::_projectResultRGLoop(rowgroup::Row& r,
uint8_t* msg8, const T* valuesArray,
const uint32_t gapSize, const uint32_t offset)
const uint32_t offset)
{ {
using T = typename datatypes::WidthToSIntegralType<W>::type; for (size_t i = 0; i < outMsg->NVALS; ++i)
for (size_t i = 0; i < outMsg->NVALS; ++i, msg8 += gapSize)
{ {
r.setUintField_offset<W>(*((T*) msg8), offset); r.setIntField_offset(valuesArray[i], offset);
r.nextRow(rowSize); r.nextRow(rowSize);
} }
} }
template<> template<>
void ColumnCommand::_projectResultRGLoop<16>(rowgroup::Row& r, void ColumnCommand::_projectResultRGLoop<int128_t>(rowgroup::Row& r,
uint8_t* msg8, const int128_t* valuesArray,
const uint32_t gapSize, const uint32_t offset)
const uint32_t offset)
{ {
using T = typename datatypes::WidthToSIntegralType<16>::type; for (size_t i = 0; i < outMsg->NVALS; ++i)
for (size_t i = 0; i < outMsg->NVALS; ++i, msg8 += gapSize)
{ {
r.setBinaryField_offset((T*)msg8, colType.colWidth, offset); r.setBinaryField_offset(&valuesArray[i], colType.colWidth, offset);
r.nextRow(rowSize); r.nextRow(rowSize);
} }
} }
@ -827,24 +827,16 @@ void ColumnCommand::_projectResultRGLoop<16>(rowgroup::Row& r,
template<int W> template<int W>
void ColumnCommand::_projectResultRG(RowGroup& rg, uint32_t pos) void ColumnCommand::_projectResultRG(RowGroup& rg, uint32_t pos)
{ {
uint32_t offset, gapSize; using T = typename datatypes::WidthToSIntegralType<W>::type;
uint8_t* msg8 = (uint8_t*) (outMsg + 1); T* valuesArray = primitives::getValuesArrayPosition<T>(primitives::getFirstValueArrayPosition(outMsg), 0);
uint32_t offset;
auto nvals = outMsg->NVALS;
if (noVB)
{
// outMsg has rids in this case
msg8 += 2;
gapSize = colType.colWidth + 2;
}
else
gapSize = colType.colWidth;
/* TODO: reoptimize these away */
rg.initRow(&r); rg.initRow(&r);
offset = r.getOffset(pos); offset = r.getOffset(pos);
rowSize = r.getSize(); rowSize = r.getSize();
if ((primMsg->NVALS != outMsg->NVALS || outMsg->NVALS != bpp->ridCount) && (!noVB || bpp->sessionID & 0x80000000)) if ((primMsg->NVALS != nvals || nvals != bpp->ridCount) && (!noVB || bpp->sessionID & 0x80000000))
{ {
ostringstream os; ostringstream os;
BRM::DBRM brm; BRM::DBRM brm;
@ -857,12 +849,12 @@ void ColumnCommand::_projectResultRG(RowGroup& rg, uint32_t pos)
os << __FILE__ << " error on projectResultRG for oid " << oid << " lbid " << lbid; os << __FILE__ << " error on projectResultRG for oid " << oid << " lbid " << lbid;
if (primMsg->NVALS != outMsg->NVALS ) if (primMsg->NVALS != nvals )
os << ": input rids " << primMsg->NVALS; os << ": input rids " << primMsg->NVALS;
else else
os << ": ridcount " << bpp->ridCount; os << ": ridcount " << bpp->ridCount;
os << ", output rids " << outMsg->NVALS; os << ", output rids " << nvals;
os << endl; os << endl;
@ -871,14 +863,14 @@ void ColumnCommand::_projectResultRG(RowGroup& rg, uint32_t pos)
else else
throw PrimitiveColumnProjectResultExcept(os.str()); throw PrimitiveColumnProjectResultExcept(os.str());
} }
else if (primMsg->NVALS != outMsg->NVALS || outMsg->NVALS != bpp->ridCount) else if (primMsg->NVALS != nvals || nvals != bpp->ridCount)
removeRowsFromRowGroup(rg); removeRowsFromRowGroup(rg);
idbassert(primMsg->NVALS == outMsg->NVALS); idbassert(primMsg->NVALS == nvals);
idbassert(outMsg->NVALS == bpp->ridCount); idbassert(bpp->ridCount == nvals);
rg.getRow(0, &r); rg.getRow(0, &r);
_projectResultRGLoop<W>(r, msg8, gapSize, offset); _projectResultRGLoop(r, valuesArray, offset);
} }
void ColumnCommand::projectResultRG(RowGroup& rg, uint32_t pos) void ColumnCommand::projectResultRG(RowGroup& rg, uint32_t pos)

View File

@ -143,10 +143,9 @@ protected:
void _process_OT_DATAVALUE(); void _process_OT_DATAVALUE();
void process_OT_ROWGROUP(); void process_OT_ROWGROUP();
void projectResult(); void projectResult();
template<int W> template<typename T>
void _projectResultRGLoop(rowgroup::Row& r, void _projectResultRGLoop(rowgroup::Row& r,
uint8_t* msg8, const T* valuesArray,
const uint32_t gapSize,
const uint32_t offset); const uint32_t offset);
template<int W> template<int W>
void _projectResultRG(rowgroup::RowGroup& rg, uint32_t pos); void _projectResultRG(rowgroup::RowGroup& rg, uint32_t pos);
@ -164,7 +163,7 @@ protected:
boost::scoped_array<uint8_t> inputMsg; boost::scoped_array<uint8_t> inputMsg;
NewColRequestHeader* primMsg; NewColRequestHeader* primMsg;
NewColResultHeader* outMsg; ColResultHeader* outMsg;
// the length of base prim msg, which is everything up to the // the length of base prim msg, which is everything up to the
// rid array for the pCol message // rid array for the pCol message

View File

@ -38,16 +38,15 @@ using namespace std;
class ColumnScanFilterTest : public ::testing::Test class ColumnScanFilterTest : public ::testing::Test
{ {
protected: protected:
PrimitiveProcessor pp; PrimitiveProcessor pp;
uint8_t input[BLOCK_SIZE]; uint8_t input[BLOCK_SIZE];
uint8_t output[4 * BLOCK_SIZE]; uint8_t output[4 * BLOCK_SIZE];
uint8_t block[BLOCK_SIZE]; uint8_t block[BLOCK_SIZE];
uint16_t* rids; uint16_t* rids;
uint32_t i; uint32_t i;
uint32_t written;
NewColRequestHeader* in; NewColRequestHeader* in;
NewColResultHeader* out; ColResultHeader* out;
ColArgs* args; ColArgs* args;
void SetUp() override void SetUp() override
@ -55,7 +54,7 @@ class ColumnScanFilterTest : public ::testing::Test
memset(input, 0, BLOCK_SIZE); memset(input, 0, BLOCK_SIZE);
memset(output, 0, 4 * BLOCK_SIZE); memset(output, 0, 4 * BLOCK_SIZE);
in = reinterpret_cast<NewColRequestHeader*>(input); in = reinterpret_cast<NewColRequestHeader*>(input);
out = reinterpret_cast<NewColResultHeader*>(output); out = reinterpret_cast<ColResultHeader*>(output);
rids = reinterpret_cast<uint16_t*>(&in[1]); rids = reinterpret_cast<uint16_t*>(&in[1]);
args = reinterpret_cast<ColArgs*>(&in[1]); args = reinterpret_cast<ColArgs*>(&in[1]);
} }
@ -121,7 +120,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan1Byte)
{ {
constexpr const uint8_t W = 1; constexpr const uint8_t W = 1;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
in->colType = ColRequestHeaderDataType(); in->colType = ColRequestHeaderDataType();
in->colType.DataSize = 1; in->colType.DataSize = 1;
in->colType.DataType = SystemCatalog::CHAR; in->colType.DataType = SystemCatalog::CHAR;
@ -130,9 +130,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan1Byte)
in->NVALS = 0; in->NVALS = 0;
pp.setBlockPtr((int*) readBlockFromLiteralArray("col1block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col1block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = &output[sizeof(NewColResultHeader)]; results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
EXPECT_EQ(out->NVALS, 8160); EXPECT_EQ(out->NVALS, 8160);
for (i = 0; i < 300; i++) for (i = 0; i < 300; i++)
@ -144,7 +144,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan2Bytes)
{ {
constexpr const uint8_t W = 2; constexpr const uint8_t W = 2;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::INT; in->colType.DataType = SystemCatalog::INT;
in->OutputType = OT_DATAVALUE; in->OutputType = OT_DATAVALUE;
@ -152,9 +153,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan2Bytes)
in->NVALS = 0; in->NVALS = 0;
pp.setBlockPtr((int*) readBlockFromLiteralArray("col2block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col2block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<uint16_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
EXPECT_EQ(out->NVALS, 4096); EXPECT_EQ(out->NVALS, 4096);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -165,7 +166,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes)
{ {
constexpr const uint8_t W = 4; constexpr const uint8_t W = 4;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::INT; in->colType.DataType = SystemCatalog::INT;
in->OutputType = OT_DATAVALUE; in->OutputType = OT_DATAVALUE;
@ -173,9 +175,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes)
in->NVALS = 0; in->NVALS = 0;
pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<uint32_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
EXPECT_EQ(out->NVALS, 2048); EXPECT_EQ(out->NVALS, 2048);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -186,7 +188,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::INT; in->colType.DataType = SystemCatalog::INT;
in->OutputType = OT_DATAVALUE; in->OutputType = OT_DATAVALUE;
@ -194,9 +197,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes)
in->NVALS = 0; in->NVALS = 0;
pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 1024); ASSERT_EQ(out->NVALS, 1024);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -207,7 +210,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan1ByteUsingRID)
{ {
constexpr const uint8_t W = 1; constexpr const uint8_t W = 1;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::INT; in->colType.DataType = SystemCatalog::INT;
in->OutputType = OT_DATAVALUE; in->OutputType = OT_DATAVALUE;
@ -217,9 +221,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan1ByteUsingRID)
rids[1] = 17; rids[1] = 17;
pp.setBlockPtr((int*) readBlockFromLiteralArray("col1block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col1block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<uint8_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 2); ASSERT_EQ(out->NVALS, 2);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -230,7 +234,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes1Filter)
{ {
constexpr const uint8_t W = 4; constexpr const uint8_t W = 4;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
IntegralType tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::INT; in->colType.DataType = SystemCatalog::INT;
@ -249,9 +254,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes1Filter)
memcpy(args->val, &tmp, in->colType.DataSize); memcpy(args->val, &tmp, in->colType.DataSize);
pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<uint32_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 9); ASSERT_EQ(out->NVALS, 9);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -263,7 +268,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2CompFilters)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
IntegralType tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
@ -283,9 +289,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2CompFilters)
memcpy(args->val, &tmp, in->colType.DataSize); memcpy(args->val, &tmp, in->colType.DataSize);
pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 33); ASSERT_EQ(out->NVALS, 33);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -296,7 +302,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFilters)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
IntegralType tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
@ -316,9 +323,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFilters)
memcpy(args->val, &tmp, in->colType.DataSize); memcpy(args->val, &tmp, in->colType.DataSize);
pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 2); ASSERT_EQ(out->NVALS, 2);
ASSERT_EQ(results[0], 10); ASSERT_EQ(results[0], 10);
@ -329,7 +336,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRID)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
IntegralType tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
@ -355,9 +363,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRID)
rids[1] = 100; rids[1] = 100;
pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<u_int64_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 1); ASSERT_EQ(out->NVALS, 1);
ASSERT_EQ(results[0], 10); ASSERT_EQ(results[0], 10);
} }
@ -366,7 +374,7 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRIDOutputRid)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
int16_t* results; primitives::RIDType* results;
IntegralType tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
@ -386,9 +394,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRIDOutputRid)
memcpy(args->val, &tmp, in->colType.DataSize); memcpy(args->val, &tmp, in->colType.DataSize);
pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<int16_t*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<RIDType>(getFirstRIDArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 33); ASSERT_EQ(out->NVALS, 33);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -400,8 +408,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRIDOutputBoth)
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
IntegralType tmp; IntegralType tmp;
IntegralType* resultVal; IntegralType* resultVal = getValuesArrayPosition<IntegralType>(getFirstValueArrayPosition(out), 0);
int16_t* resultRid; primitives::RIDType* resultRid = getRIDArrayPosition(getFirstRIDArrayPosition(out), 0);
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::INT; in->colType.DataType = SystemCatalog::INT;
@ -420,17 +428,14 @@ TEST_F(ColumnScanFilterTest, ColumnScan8Bytes2EqFiltersRIDOutputBoth)
memcpy(args->val, &tmp, in->colType.DataSize); memcpy(args->val, &tmp, in->colType.DataSize);
pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col8block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
ASSERT_EQ(out->NVALS, 33); ASSERT_EQ(out->NVALS, 33);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
{ {
resultRid = reinterpret_cast<int16_t*>(&output[ ASSERT_EQ(resultRid[i], (i < 10 ? i : i - 10 + 1001));
sizeof(NewColResultHeader) + i * (sizeof(int16_t) + in->colType.DataSize)]); ASSERT_EQ(resultVal[i], (i < 10 ? i : i - 10 + 1001));
resultVal = reinterpret_cast<int64_t*>(&resultRid[1]);
ASSERT_EQ(*resultRid, (i < 10 ? i : i - 10 + 1001));
ASSERT_EQ(*resultVal, (i < 10 ? i : i - 10 + 1001));
} }
} }
@ -439,7 +444,8 @@ TEST_F(ColumnScanFilterTest, ColumnScan1Byte2CompFilters)
{ {
constexpr const uint8_t W = 1; constexpr const uint8_t W = 1;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = datatypes::WidthToSIntegralType<W>::type;
datatypes::make_unsigned<IntegralType>::type* results; using UT = datatypes::make_unsigned<IntegralType>::type;
UT* results;
in->colType = ColRequestHeaderDataType(); in->colType = ColRequestHeaderDataType();
in->colType.DataSize = W; in->colType.DataSize = W;
@ -457,9 +463,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan1Byte2CompFilters)
args->val[0] = '4'; args->val[0] = '4';
pp.setBlockPtr((int*) readBlockFromLiteralArray("col1block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col1block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = &output[sizeof(NewColResultHeader)]; results = getValuesArrayPosition<UT>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 32); ASSERT_EQ(out->NVALS, 32);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -506,9 +512,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes2CompFiltersOutputRID)
memcpy(&args->val[in->colType.DataSize], &ridTmp, 2); memcpy(&args->val[in->colType.DataSize], &ridTmp, 2);
pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col4block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<IntegralType>(in, out);
results = reinterpret_cast<int16_t*>(&output[sizeof(NewColResultHeader)]); results = reinterpret_cast<int16_t*>(&output[sizeof(ColResultHeader)]);
ASSERT_EQ(out->NVALS, 2); ASSERT_EQ(out->NVALS, 2);
ASSERT_EQ(results[0], 8); ASSERT_EQ(results[0], 8);
ASSERT_EQ(results[1], 11); ASSERT_EQ(results[1], 11);
@ -518,9 +524,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan4Bytes2CompFiltersOutputRID)
TEST_F(ColumnScanFilterTest, ColumnScan8BytesDouble2CompFilters) TEST_F(ColumnScanFilterTest, ColumnScan8BytesDouble2CompFilters)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = double;
double* results; IntegralType* results;
double tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::DOUBLE; in->colType.DataType = SystemCatalog::DOUBLE;
@ -538,9 +544,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan8BytesDouble2CompFilters)
memcpy(args->val, &tmp, sizeof(tmp)); memcpy(args->val, &tmp, sizeof(tmp));
pp.setBlockPtr((int*) readBlockFromLiteralArray("col_double_block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col_double_block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<int64_t>(in, out);
results = reinterpret_cast<double*>(&output[sizeof(NewColResultHeader)]); results = getValuesArrayPosition<IntegralType>(getFirstValueArrayPosition(out), 0);
ASSERT_EQ(out->NVALS, 8); ASSERT_EQ(out->NVALS, 8);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
@ -551,10 +557,10 @@ TEST_F(ColumnScanFilterTest, ColumnScan8BytesDouble2CompFilters)
TEST_F(ColumnScanFilterTest, ColumnScan4BytesFloat2CompFiltersOutputBoth) TEST_F(ColumnScanFilterTest, ColumnScan4BytesFloat2CompFiltersOutputBoth)
{ {
constexpr const uint8_t W = 4; constexpr const uint8_t W = 4;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = float;
float* resultVal; IntegralType tmp;
float tmp; IntegralType* resultVal = getValuesArrayPosition<IntegralType>(getFirstValueArrayPosition(out), 0);
int16_t* resultRid; RIDType* resultRid = getRIDArrayPosition(getFirstRIDArrayPosition(out), 0);
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::FLOAT; in->colType.DataType = SystemCatalog::FLOAT;
@ -572,27 +578,24 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesFloat2CompFiltersOutputBoth)
memcpy(args->val, &tmp, sizeof(tmp)); memcpy(args->val, &tmp, sizeof(tmp));
pp.setBlockPtr((int*) readBlockFromLiteralArray("col_float_block.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col_float_block.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<int32_t>(in, out);
ASSERT_EQ(out->NVALS, 8); ASSERT_EQ(out->NVALS, 8);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
{ {
resultRid = reinterpret_cast<int16_t*>(&output[ ASSERT_EQ(resultRid[i], 19 + i);
sizeof(NewColResultHeader) + i * (sizeof(int16_t) + in->colType.DataSize)]); ASSERT_EQ(resultVal[i], 11 + (i * 0.5));
resultVal = reinterpret_cast<float*>(&resultRid[1]);
ASSERT_EQ(*resultVal, 11 + (i * 0.5));
} }
} }
//void p_Col_neg_float_1() //void p_Col_neg_float_1()
TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegFloat2CompFiltersOutputBoth) TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegFloat2CompFiltersOutputBoth)
{ {
constexpr const uint8_t W = 4; using IntegralType = float;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; IntegralType tmp;
float* resultVal; IntegralType* resultVal = getValuesArrayPosition<IntegralType>(getFirstValueArrayPosition(out), 0);
float tmp; RIDType* resultRid = getRIDArrayPosition(getFirstRIDArrayPosition(out), 0);
int16_t* resultRid;
in->colType.DataSize = 4; in->colType.DataSize = 4;
in->colType.DataType = SystemCatalog::FLOAT; in->colType.DataType = SystemCatalog::FLOAT;
@ -610,15 +613,13 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegFloat2CompFiltersOutputBoth)
memcpy(args->val, &tmp, sizeof(tmp)); memcpy(args->val, &tmp, sizeof(tmp));
pp.setBlockPtr((int*) readBlockFromLiteralArray("col_neg_float.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col_neg_float.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<int32_t>(in, out);
ASSERT_EQ(out->NVALS, 19); ASSERT_EQ(out->NVALS, 19);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
{ {
resultRid = reinterpret_cast<int16_t*>(&output[ ASSERT_EQ(resultRid[i], 12 + i);
sizeof(NewColResultHeader) + i * (sizeof(int16_t) + in->colType.DataSize)]); ASSERT_EQ(resultVal[i], -4.5 + (i * 0.5));
resultVal = reinterpret_cast<float*>(&resultRid[1]);
ASSERT_EQ(*resultVal, -4.5 + (i * 0.5));
} }
} }
@ -626,9 +627,9 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegFloat2CompFiltersOutputBoth)
TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegDouble2CompFilters) TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegDouble2CompFilters)
{ {
constexpr const uint8_t W = 8; constexpr const uint8_t W = 8;
using IntegralType = datatypes::WidthToSIntegralType<W>::type; using IntegralType = double;
double* results; IntegralType* results = getValuesArrayPosition<IntegralType>(getFirstValueArrayPosition(out), 0);
double tmp; IntegralType tmp;
in->colType.DataSize = W; in->colType.DataSize = W;
in->colType.DataType = SystemCatalog::DOUBLE; in->colType.DataType = SystemCatalog::DOUBLE;
@ -646,13 +647,14 @@ TEST_F(ColumnScanFilterTest, ColumnScan4BytesNegDouble2CompFilters)
memcpy(args->val, &tmp, sizeof(tmp)); memcpy(args->val, &tmp, sizeof(tmp));
pp.setBlockPtr((int*) readBlockFromLiteralArray("col_neg_double.cdf", block)); pp.setBlockPtr((int*) readBlockFromLiteralArray("col_neg_double.cdf", block));
pp.columnScanAndFilter<IntegralType>(in, out, 4 * BLOCK_SIZE, &written); pp.columnScanAndFilter<int64_t>(in, out);
results = reinterpret_cast<double*>(&output[sizeof(NewColResultHeader)]); //ASSERT_EQ(out->NVALS, 19);
ASSERT_EQ(out->NVALS, 19);
for (i = 0; i < out->NVALS; i++) for (i = 0; i < out->NVALS; i++)
ASSERT_EQ(results[i], -4.5 + (i * 0.5)); {
ASSERT_EQ(results[i], -4.5 + (i * 0.5));
}
} }
TEST_F(ColumnScanFilterTest, ColumnScan16Bytes2CompFilters) TEST_F(ColumnScanFilterTest, ColumnScan16Bytes2CompFilters)

View File

@ -463,6 +463,7 @@ public:
for the other types as well as the getters. for the other types as well as the getters.
*/ */
template<int len> void setUintField_offset(uint64_t val, uint32_t offset); template<int len> void setUintField_offset(uint64_t val, uint32_t offset);
template<typename T>void setIntField_offset(const T val, const uint32_t offset);
inline void nextRow(uint32_t size); inline void nextRow(uint32_t size);
inline void prevRow(uint32_t size, uint64_t number); inline void prevRow(uint32_t size, uint64_t number);
@ -1210,6 +1211,12 @@ inline void Row::setUintField_offset(uint64_t val, uint32_t offset)
} }
} }
template<typename T>
inline void Row::setIntField_offset(const T val, const uint32_t offset)
{
*((T*) &data[offset]) = val;
}
inline void Row::nextRow(uint32_t size) inline void Row::nextRow(uint32_t size)
{ {
data += size; data += size;