1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-641 Basic extent elimination support for Decimal38.

This commit is contained in:
Gagan Goel
2020-02-01 22:16:58 -05:00
committed by Roman Nozdrin
parent 84f9821720
commit 55afcd8890
33 changed files with 1318 additions and 325 deletions

View File

@ -573,6 +573,10 @@ keepGoing:
{
colDefPtr->fType->fLength = 8;
}
else if (colDefPtr->fType->fPrecision > 18 && colDefPtr->fType->fPrecision < 39)
{
colDefPtr->fType->fLength = 16;
}
}
bytestream << (fStartingColOID + (colNum++) + 1);

View File

@ -510,6 +510,8 @@ int DMLPackageProcessor::commitBatchAutoOnTransaction(uint64_t uniqueId, BRM::Tx
aInfo.firstLbid = *iter;
aInfo.max = numeric_limits<int64_t>::min(); // Not used
aInfo.min = numeric_limits<int64_t>::max(); // Not used
dataconvert::DataConvert::int128Min(aInfo.bigMax); // Not used
dataconvert::DataConvert::int128Max(aInfo.bigMin); // Not used
aInfo.seqNum = -1;
cpInfos.push_back(aInfo);
++iter;

View File

@ -1016,6 +1016,7 @@ inline bool isUnsigned(const execplan::CalpontSystemCatalog::ColDataType type)
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
// TODO MCOL-641 add decimal here
return true;
default:

View File

@ -411,6 +411,7 @@ void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint
* block touches
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in,
vector<ElementType>* out, bool* validCPData, uint64_t* lbid, int64_t* min,
int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
@ -491,6 +492,7 @@ void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in,
* blocks touched
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in,
vector<StringElementType>* out, bool* validCPData, uint64_t* lbid, int64_t* min,
int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
@ -547,6 +549,7 @@ void BatchPrimitiveProcessorJL::getStringElementTypes(ByteStream& in,
* as when the output type is TableBands
*/
// TODO MCOL-641 Add support here. Refer to BatchPrimitiveProcessor::makeResponse()
void BatchPrimitiveProcessorJL::getTuples(messageqcpp::ByteStream& in,
std::vector<TupleType>* out, bool* validCPData, uint64_t* lbid, int64_t* min,
int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks) const
@ -692,9 +695,9 @@ bool BatchPrimitiveProcessorJL::countThisMsg(messageqcpp::ByteStream& in) const
if (_hasScan)
{
if (data[offset] != 0)
offset += 25; // skip the CP data
offset += (data[offset + CP_FLAG_AND_LBID] * 2) + CP_FLAG_AND_LBID + 1; // skip the CP data with wide min/max values (16/32 bytes each)
else
offset += 9; // skip only the "valid CP data" & LBID bytes
offset += CP_FLAG_AND_LBID; // skip only the "valid CP data" & LBID bytes
}
idbassert(in.length() > offset);
@ -718,11 +721,12 @@ void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in,
}
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out,
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
bool* validCPData, uint64_t* lbid, __int128* min, __int128* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis,
uint32_t threadID) const
uint32_t threadID, bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const
{
uint64_t tmp64;
unsigned __int128 tmp128;
uint8_t tmp8;
RGData rgData;
uint32_t rowCount;
@ -754,10 +758,23 @@ void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>*
if (*validCPData)
{
in >> *lbid;
in >> tmp8;
*hasBinaryColumn = (tmp8 > 8);
if (*hasBinaryColumn)
{
idbassert(colType.colWidth > 8);
in >> tmp128;
*min = tmp128;
in >> tmp128;
*max = tmp128;
}
else
{
in >> tmp64;
*min = (int64_t) tmp64;
*min = static_cast<__int128>(tmp64);
in >> tmp64;
*max = (int64_t) tmp64;
*max = static_cast<__int128>(tmp64);
}
}
else
in >> *lbid;

View File

@ -49,6 +49,8 @@ namespace joblist
{
const uint32_t LOGICAL_BLOCKS_CONVERTER = 23; // 10 + 13. 13 to convert to logical blocks,
// 10 to convert to groups of 1024 logical blocks
const uint8_t CP_FLAG_AND_LBID = 9; // # bytes used for CP boolean and the lbid
// used by BatchPrimitiveProcessorJL::countThisMsg()
// forward reference
struct JobInfo;
@ -166,9 +168,9 @@ public:
void deserializeAggregateResults(messageqcpp::ByteStream* in,
std::vector<rowgroup::RGData>* out) const;
void getRowGroupData(messageqcpp::ByteStream& in, std::vector<rowgroup::RGData>* out,
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max,
bool* validCPData, uint64_t* lbid, __int128* min, __int128* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis,
uint32_t threadID) const;
uint32_t threadID, bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const;
void deserializeAggregateResult(messageqcpp::ByteStream* in,
std::vector<rowgroup::RGData>* out) const;
bool countThisMsg(messageqcpp::ByteStream& in) const;

View File

@ -180,8 +180,8 @@ void LBIDList::Dump(long Index, int Count) const
// Store max/min structure for update later. lbidPartitionVector serves a list of lbid,max,min to update
// when the primitive returns with valid max/min values and the brm returns an invalid flag for the
// requested lbid
//
bool LBIDList::GetMinMax(int64_t& min, int64_t& max, int64_t& seq, int64_t lbid,
template <typename T>
bool LBIDList::GetMinMax(T& min, T& max, int64_t& seq, int64_t lbid,
const std::vector<struct BRM::EMEntry>* pEMEntries,
execplan::CalpontSystemCatalog::ColDataType colDataType)
{
@ -234,15 +234,31 @@ bool LBIDList::GetMinMax(int64_t& min, int64_t& max, int64_t& seq, int64_t lbid,
mmp->seq = seq32;
if (isUnsigned(colDataType))
{
if (typeid(T) == typeid(__int128))
{
mmp->bigMax = 0;
mmp->bigMin = -1;
}
else
{
mmp->max = 0;
mmp->min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
}
}
else
{
if (typeid(T) == typeid(__int128))
{
dataconvert::DataConvert::int128Min(mmp->bigMax);
dataconvert::DataConvert::int128Max(mmp->bigMin);
}
else
{
mmp->max = numeric_limits<int64_t>::min();
mmp->min = numeric_limits<int64_t>::max();
}
}
mmp->isValid = retVal;
mmp->blksScanned = 0;
@ -257,6 +273,7 @@ bool LBIDList::GetMinMax(int64_t& min, int64_t& max, int64_t& seq, int64_t lbid,
return false;
}
//TODO MCOL-641 Do we need support here?
bool LBIDList::GetMinMax(int64_t* min, int64_t* max, int64_t* seq,
int64_t lbid, const tr1::unordered_map<int64_t, BRM::EMEntry>& entries,
execplan::CalpontSystemCatalog::ColDataType colDataType)
@ -302,7 +319,8 @@ bool LBIDList::GetMinMax(int64_t* min, int64_t* max, int64_t* seq,
// Get the min, max, and sequence number for the specified LBID by searching
// the given vector of ExtentMap entries.
int LBIDList::getMinMaxFromEntries(int64_t& min, int64_t& max, int32_t& seq,
template <typename T>
int LBIDList::getMinMaxFromEntries(T& min, T& max, int32_t& seq,
int64_t lbid, const std::vector<struct BRM::EMEntry>& EMEntries)
{
for (unsigned i = 0; i < EMEntries.size(); i++)
@ -310,9 +328,17 @@ int LBIDList::getMinMaxFromEntries(int64_t& min, int64_t& max, int32_t& seq,
int64_t lastLBID = EMEntries[i].range.start + (EMEntries[i].range.size * 1024) - 1;
if (lbid >= EMEntries[i].range.start && lbid <= lastLBID)
{
if (typeid(T) == typeid(__int128))
{
min = EMEntries[i].partition.cprange.bigLoVal;
max = EMEntries[i].partition.cprange.bigHiVal;
}
else
{
min = EMEntries[i].partition.cprange.lo_val;
max = EMEntries[i].partition.cprange.hi_val;
}
seq = EMEntries[i].partition.cprange.sequenceNum;
return EMEntries[i].partition.cprange.isValid;
}
@ -321,8 +347,8 @@ int LBIDList::getMinMaxFromEntries(int64_t& min, int64_t& max, int32_t& seq,
return BRM::CP_INVALID;
}
//
void LBIDList::UpdateMinMax(int64_t min, int64_t max, int64_t lbid, CalpontSystemCatalog::ColDataType type,
template <typename T>
void LBIDList::UpdateMinMax(T min, T max, int64_t lbid, CalpontSystemCatalog::ColDataType type,
bool validData)
{
MinMaxPartition* mmp = NULL;
@ -372,6 +398,16 @@ void LBIDList::UpdateMinMax(int64_t min, int64_t max, int64_t lbid, CalpontSyste
mmp->max = max;
}
else if (execplan::isUnsigned(type))
{
if (typeid(T) == typeid(__int128))
{
if (static_cast<unsigned __int128>(min) < static_cast<unsigned __int128>(mmp->bigMin))
mmp->bigMin = min;
if (static_cast<unsigned __int128>(max) > static_cast<unsigned __int128>(mmp->bigMax))
mmp->bigMax = max;
}
else
{
if (static_cast<uint64_t>(min) < static_cast<uint64_t>(mmp->min))
mmp->min = min;
@ -379,6 +415,17 @@ void LBIDList::UpdateMinMax(int64_t min, int64_t max, int64_t lbid, CalpontSyste
if (static_cast<uint64_t>(max) > static_cast<uint64_t>(mmp->max))
mmp->max = max;
}
}
else
{
if (typeid(T) == typeid(__int128))
{
if (min < mmp->bigMin)
mmp->bigMin = min;
if (max > mmp->bigMax)
mmp->bigMax = max;
}
else
{
if (min < mmp->min)
@ -387,6 +434,7 @@ void LBIDList::UpdateMinMax(int64_t min, int64_t max, int64_t lbid, CalpontSyste
if (max > mmp->max)
mmp->max = max;
}
}
}
@ -405,7 +453,7 @@ void LBIDList::UpdateMinMax(int64_t min, int64_t max, int64_t lbid, CalpontSyste
}
}
void LBIDList::UpdateAllPartitionInfo()
void LBIDList::UpdateAllPartitionInfo(const execplan::CalpontSystemCatalog::ColType& colType)
{
MinMaxPartition* mmp = NULL;
#ifdef DEBUG
@ -431,8 +479,17 @@ void LBIDList::UpdateAllPartitionInfo()
if ((mmp->isValid == BRM::CP_INVALID) && (mmp->blksScanned > 0))
{
cpInfo.firstLbid = mmp->lbid;
cpInfo.isBinaryColumn = (colType.colWidth == 16);
if (cpInfo.isBinaryColumn)
{
cpInfo.bigMax = mmp->bigMax;
cpInfo.bigMin = mmp->bigMin;
}
else
{
cpInfo.max = mmp->max;
cpInfo.min = mmp->min;
}
cpInfo.seqNum = (int32_t)mmp->seq;
vCpInfo.push_back(cpInfo);
@ -638,8 +695,7 @@ bool LBIDList::checkRangeOverlap(int64_t min, int64_t max, int64_t tmin, int64_t
}
}
bool LBIDList::CasualPartitionPredicate(const int64_t Min,
const int64_t Max,
bool LBIDList::CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange,
const messageqcpp::ByteStream* bs,
const uint16_t NOPS,
const execplan::CalpontSystemCatalog::ColType& ct,
@ -650,6 +706,9 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min,
const char* MsgDataPtr = (const char*) bs->buf();
bool scan = true;
int64_t value = 0;
__int128 bigValue = 0;
dataconvert::Int128Pod_t* bigValuePod;
bigValuePod = reinterpret_cast<dataconvert::Int128Pod_t*>(&bigValue);
bool bIsUnsigned = execplan::isUnsigned(ct.colDataType);
bool bIsChar = execplan::isCharType(ct.colDataType);
@ -700,7 +759,13 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min,
value = static_cast<int64_t>(val);
}
case 16:
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
{
unsigned __int128 val;
bigValuePod = reinterpret_cast<dataconvert::Int128Pod_t*>(&val);
bigValuePod->lo = *reinterpret_cast<const uint64_t*>(MsgDataPtr);
bigValuePod->hi = *(reinterpret_cast<const uint64_t*>(MsgDataPtr) + 1);
bigValue = static_cast<__int128>(val);
}
}
}
else
@ -734,11 +799,15 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min,
value = val;
}
case 16:
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
{
bigValuePod->lo = *reinterpret_cast<const uint64_t*>(MsgDataPtr);
bigValuePod->hi = *(reinterpret_cast<const uint64_t*>(MsgDataPtr) + 1);
}
}
}
// Should we also check for empty here?
// TODO MCOL-641
if (isNull(value, ct)) // This will work even if the data column is unsigned.
{
continue;
@ -750,8 +819,8 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min,
{
// MCOL-1246 Trim trailing whitespace for matching so that we have
// the same as InnoDB behaviour
int64_t tMin = Min;
int64_t tMax = Max;
int64_t tMin = cpRange.lo_val;
int64_t tMax = cpRange.hi_val;
dataconvert::DataConvert::trimWhitespace(tMin);
dataconvert::DataConvert::trimWhitespace(tMax);
@ -761,11 +830,25 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min,
}
else if (bIsUnsigned)
{
scan = compareVal(static_cast<uint64_t>(Min), static_cast<uint64_t>(Max), static_cast<uint64_t>(value), op, lcf);
if (ct.colWidth <= 8)
{
scan = compareVal(static_cast<uint64_t>(cpRange.lo_val), static_cast<uint64_t>(cpRange.hi_val), static_cast<uint64_t>(value), op, lcf);
}
else if (ct.colWidth == 16)
{
scan = compareVal(static_cast<unsigned __int128>(cpRange.bigLoVal), static_cast<unsigned __int128>(cpRange.bigHiVal), static_cast<unsigned __int128>(bigValue), op, lcf);
}
}
else
{
scan = compareVal(Min, Max, value, op, lcf);
if (ct.colWidth <= 8)
{
scan = compareVal(cpRange.lo_val, cpRange.hi_val, value, op, lcf);
}
else if (ct.colWidth == 16)
{
scan = compareVal(cpRange.bigLoVal, cpRange.bigHiVal, bigValue, op, lcf);
}
}
if (BOP == BOP_AND && !scan)
@ -820,6 +903,23 @@ void LBIDList::copyLbidList(const LBIDList& rhs)
fDebug = rhs.fDebug;
}
template
bool LBIDList::GetMinMax<__int128>(__int128& min, __int128& max, int64_t& seq, int64_t lbid,
const std::vector<struct BRM::EMEntry>* pEMEntries,
execplan::CalpontSystemCatalog::ColDataType colDataType);
template
bool LBIDList::GetMinMax<int64_t>(int64_t& min, int64_t& max, int64_t& seq, int64_t lbid,
const std::vector<struct BRM::EMEntry>* pEMEntries,
execplan::CalpontSystemCatalog::ColDataType colDataType);
template
void LBIDList::UpdateMinMax<__int128>(__int128 min, __int128 max, int64_t lbid,
execplan::CalpontSystemCatalog::ColDataType type, bool validData = true);
template
void LBIDList::UpdateMinMax<int64_t>(int64_t min, int64_t max, int64_t lbid,
execplan::CalpontSystemCatalog::ColDataType type, bool validData = true);
} //namespace joblist
// vim:ts=4 sw=4:

View File

@ -50,11 +50,19 @@ struct MinMaxPartition
{
int64_t lbid;
int64_t lbidmax;
int64_t min;
int64_t max;
int64_t seq;
int isValid;
uint32_t blksScanned;
union
{
__int128 bigMin;
int64_t min;
};
union
{
__int128 bigMax;
int64_t max;
};
};
/** @brief class LBIDList
@ -84,7 +92,8 @@ public:
// If pEMEntries is provided, then min/max will be extracted from that
// vector, else extents in BRM will be searched. If type is unsigned, caller
// should static cast returned min and max to uint64_t
bool GetMinMax(int64_t& min, int64_t& max, int64_t& seq, int64_t lbid,
template<typename T>
bool GetMinMax(T& min, T& max, int64_t& seq, int64_t lbid,
const std::vector<struct BRM::EMEntry>* pEMEntries,
execplan::CalpontSystemCatalog::ColDataType type);
@ -92,15 +101,15 @@ public:
const std::tr1::unordered_map<int64_t, BRM::EMEntry>& entries,
execplan::CalpontSystemCatalog::ColDataType type);
void UpdateMinMax(int64_t min, int64_t max, int64_t lbid,
template <typename T>
void UpdateMinMax(T min, T max, int64_t lbid,
execplan::CalpontSystemCatalog::ColDataType type, bool validData = true);
void UpdateAllPartitionInfo();
void UpdateAllPartitionInfo(const execplan::CalpontSystemCatalog::ColType& colType);
bool IsRangeBoundary(uint64_t lbid);
bool CasualPartitionPredicate(const int64_t Min,
const int64_t Max,
bool CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange,
const messageqcpp::ByteStream* MsgDataPtr,
const uint16_t NOPS,
const execplan::CalpontSystemCatalog::ColType& ct,
@ -135,7 +144,8 @@ private:
template<class T>
inline bool compareVal(const T& Min, const T& Max, const T& value, char op, uint8_t lcf);
int getMinMaxFromEntries(int64_t& min, int64_t& max, int32_t& seq,
template <typename T>
int getMinMaxFromEntries(T& min, T& max, int32_t& seq,
int64_t lbid, const std::vector<struct BRM::EMEntry>& EMEntries);
boost::shared_ptr<BRM::DBRM> em;

View File

@ -704,8 +704,8 @@ struct NewColResultHeader
uint16_t NVALS;
uint16_t ValidMinMax; // 1 if Min/Max are valid, otherwise 0
uint32_t OutputType;
int64_t Min; // Minimum value in this block for signed data types
int64_t Max; // Maximum value in this block for signed data types
__int128 Min; // Minimum value in this block for signed data types
__int128 Max; // Maximum value in this block for signed data types
uint32_t CacheIO; // I/O count from buffer cache
uint32_t PhysicalIO; // Physical I/O count from disk
// if OutputType was OT_DATAVALUE, what follows is DataType[NVALS]

View File

@ -838,8 +838,7 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
scanFlags[idx] = scanFlags[idx] &&
(ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID ||
lbidListVec[i]->CasualPartitionPredicate(
extent.partition.cprange.lo_val,
extent.partition.cprange.hi_val,
extent.partition.cprange,
&(colCmd->getFilterString()),
colCmd->getFilterCount(),
colCmd->getColType(),
@ -928,6 +927,7 @@ void TupleBPS::prepCasualPartitioning()
{
uint32_t i;
int64_t min, max, seq;
__int128 bigMin, bigMax;
boost::mutex::scoped_lock lk(cpMutex);
for (i = 0; i < scannedExtents.size(); i++)
@ -938,9 +938,19 @@ void TupleBPS::prepCasualPartitioning()
if (scanFlags[i] && lbidList->CasualPartitionDataType(fColType.colDataType,
fColType.colWidth))
{
if (fColType.colWidth <= 8)
{
lbidList->GetMinMax(min, max, seq, (int64_t) scannedExtents[i].range.start,
&scannedExtents, fColType.colDataType);
}
else if (fColType.colWidth == 16)
{
lbidList->GetMinMax(bigMin, bigMax, seq, (int64_t) scannedExtents[i].range.start,
&scannedExtents, fColType.colDataType);
}
}
}
else
scanFlags[i] = true;
}
@ -1859,8 +1869,17 @@ abort:
struct _CPInfo
{
_CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool val) : min(MIN), max(MAX), LBID(l), valid(val) { };
_CPInfo(__int128 BIGMIN, __int128 BIGMAX, uint64_t l, bool val) : bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val) { };
union
{
__int128 bigMin;
int64_t min;
};
union
{
__int128 bigMax;
int64_t max;
};
uint64_t LBID;
bool valid;
};
@ -1878,8 +1897,9 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
vector<RGData> fromPrimProc;
bool validCPData;
int64_t min;
int64_t max;
bool hasBinaryColumn;
__int128 min;
__int128 max;
uint64_t lbid;
vector<_CPInfo> cpv;
uint32_t cachedIO;
@ -2154,7 +2174,7 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
fromPrimProc.clear();
fBPP->getRowGroupData(*bs, &fromPrimProc, &validCPData, &lbid, &min, &max,
&cachedIO, &physIO, &touchedBlocks, &unused, threadID);
&cachedIO, &physIO, &touchedBlocks, &unused, threadID, &hasBinaryColumn, fColType);
/* Another layer of messiness. Need to refactor this fcn. */
while (!fromPrimProc.empty() && !cancelled())
@ -2318,7 +2338,16 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
touchedBlocks_Thread += touchedBlocks;
if (fOid >= 3000 && ffirstStepType == SCAN && bop == BOP_AND)
{
if (fColType.colWidth <= 8)
{
cpv.push_back(_CPInfo((int64_t) min, (int64_t) max, lbid, validCPData));
}
else if (fColType.colWidth == 16)
{
cpv.push_back(_CPInfo(min, max, lbid, validCPData));
}
}
} // end of the per-rowgroup processing loop
// insert the resulting rowgroup data from a single bytestream into dlp
@ -2343,10 +2372,18 @@ void TupleBPS::receiveMultiPrimitiveMessages(uint32_t threadID)
cpMutex.lock();
for (i = 0; i < size; i++)
{
if (fColType.colWidth > 8)
{
lbidList->UpdateMinMax(cpv[i].bigMin, cpv[i].bigMax, cpv[i].LBID, fColType.colDataType,
cpv[i].valid);
}
else
{
lbidList->UpdateMinMax(cpv[i].min, cpv[i].max, cpv[i].LBID, fColType.colDataType,
cpv[i].valid);
}
}
cpMutex.unlock();
}
@ -2619,7 +2656,7 @@ out:
if (ffirstStepType == SCAN && bop == BOP_AND && !cancelled())
{
cpMutex.lock();
lbidList->UpdateAllPartitionInfo();
lbidList->UpdateAllPartitionInfo(fColType);
cpMutex.unlock();
}
}

View File

@ -235,15 +235,30 @@ struct PartitionInfo
{
int64_t min;
int64_t max;
union
{
__int128 bigMin;
int64_t min_;
};
union
{
__int128 bigMax;
int64_t max_;
};
uint64_t status;
PartitionInfo(): min((uint64_t)0x8000000000000001ULL),
max((uint64_t) - 0x8000000000000001LL),
status(0) {};
status(0)
{
DataConvert::int128Min(bigMin);
DataConvert::int128Max(bigMax);
};
};
typedef map<LogicalPartition, PartitionInfo> PartitionMap;
const string format(int64_t v, CalpontSystemCatalog::ColType& ct)
template <typename T>
const string format(T v, CalpontSystemCatalog::ColType& ct)
{
ostringstream oss;
@ -281,6 +296,8 @@ const string format(int64_t v, CalpontSystemCatalog::ColType& ct)
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
if (ct.colWidth <= 8)
{
if (ct.scale > 0)
{
@ -289,7 +306,14 @@ const string format(int64_t v, CalpontSystemCatalog::ColType& ct)
}
else
{
oss << v;
oss << (int64_t) v;
}
}
else
{
char buf[MAX_DECIMAL_STRING_LENGTH];
DataConvert::decimalToString((__int128*)&v, (unsigned)ct.scale, buf, sizeof(buf), ct.colDataType);
oss << buf;
}
break;
@ -308,7 +332,7 @@ const string format(int64_t v, CalpontSystemCatalog::ColType& ct)
break;
default:
oss << v;
oss << (int64_t) v;
break;
}
@ -1075,7 +1099,13 @@ extern "C"
partInfo.status |= ET_DISABLED;
mapit = partMap.find(logicalPartNum);
int state = em.getExtentMaxMin(iter->range.start, partInfo.max, partInfo.min, seqNum);
int state;
if (ct.colWidth <= 8)
state = em.getExtentMaxMin(iter->range.start, partInfo.max, partInfo.min, seqNum);
else if (ct.colWidth == 16)
state = em.getExtentMaxMin(iter->range.start, partInfo.bigMax, partInfo.bigMin, seqNum);
// char column order swap for compare
if ((ct.colDataType == CalpontSystemCatalog::CHAR && ct.colWidth <= 8) ||
@ -1097,9 +1127,17 @@ extern "C"
if (mapit->second.status & CPINVALID)
continue;
if (ct.colWidth <= 8)
{
mapit->second.min = (partInfo.min < mapit->second.min ? partInfo.min : mapit->second.min);
mapit->second.max = (partInfo.max > mapit->second.max ? partInfo.max : mapit->second.max);
}
else if (ct.colWidth == 16)
{
mapit->second.bigMin = (partInfo.bigMin < mapit->second.bigMin ? partInfo.bigMin : mapit->second.bigMin);
mapit->second.bigMax = (partInfo.bigMax > mapit->second.bigMax ? partInfo.bigMax : mapit->second.bigMax);
}
}
}
}
}
@ -1118,13 +1156,29 @@ extern "C"
ostringstream output;
output.setf(ios::left, ios::adjustfield);
if (ct.colWidth <= 8)
{
output << setw(10) << "Part#"
<< setw(30) << "Min"
<< setw(30) << "Max" << "Status";
}
else
{
output << setw(10) << "Part#"
<< setw(40) << "Min"
<< setw(40) << "Max" << "Status";
}
int64_t maxLimit = numeric_limits<int64_t>::max();
int64_t minLimit = numeric_limits<int64_t>::min();
__int128 bigMaxLimit, bigMinLimit;
DataConvert::int128Max(bigMaxLimit);
DataConvert::int128Min(bigMinLimit);
unsigned __int128 ubigMaxLimit, ubigMinLimit;
DataConvert::uint128Max(ubigMaxLimit);
ubigMinLimit = 0;
// char column order swap for compare in subsequent loop
if ((ct.colDataType == CalpontSystemCatalog::CHAR && ct.colWidth <= 8) ||
(ct.colDataType == CalpontSystemCatalog::VARCHAR && ct.colWidth <= 7))
@ -1143,11 +1197,16 @@ extern "C"
if (partIt->second.status & CPINVALID)
{
if (ct.colWidth <= 8)
output << setw(30) << "N/A" << setw(30) << "N/A";
else
output << setw(40) << "N/A" << setw(40) << "N/A";
}
else
{
if ((isUnsigned(ct.colDataType)))
{
if (ct.colWidth <= 8)
{
if (static_cast<uint64_t>(partIt->second.min) == numeric_limits<uint64_t>::max()
&& static_cast<uint64_t>(partIt->second.max) == numeric_limits<uint64_t>::min())
@ -1156,12 +1215,31 @@ extern "C"
output << setw(30) << format(partIt->second.min, ct) << setw(30) << format(partIt->second.max, ct);
}
else
{
if (static_cast<unsigned __int128>(partIt->second.bigMin) == ubigMaxLimit
&& static_cast<uint64_t>(partIt->second.bigMax) == ubigMinLimit)
output << setw(40) << "Empty/Null" << setw(40) << "Empty/Null";
else
output << setw(40) << format(partIt->second.bigMin, ct) << setw(40) << format(partIt->second.bigMax, ct);
}
}
else
{
if (ct.colWidth <= 8)
{
if (partIt->second.min == maxLimit && partIt->second.max == minLimit)
output << setw(30) << "Empty/Null" << setw(30) << "Empty/Null";
else
output << setw(30) << format(partIt->second.min, ct) << setw(30) << format(partIt->second.max, ct);
}
else
{
if (partIt->second.bigMin == bigMaxLimit && partIt->second.bigMax == bigMinLimit)
output << setw(40) << "Empty/Null" << setw(40) << "Empty/Null";
else
output << setw(40) << format(partIt->second.bigMin, ct) << setw(40) << format(partIt->second.bigMax, ct);
}
}
}
if (partIt->second.status & ET_DISABLED)

View File

@ -616,7 +616,7 @@ inline bool isMinMaxValid(const NewColRequestHeader* in)
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
return (in->DataSize <= 8);
return (in->DataSize <= 16);
default:
return false;
@ -906,8 +906,8 @@ inline uint8_t* nextBinColValue(int type,
*rid = ridArray[(*index)++];
}
*isNull = isNullVal<W>(type, val8);
*isEmpty = isEmptyVal<W>(type, val8);
*isNull = isNullVal<W>(type, &val8[*rid * W]);
*isEmpty = isEmptyVal<W>(type, &val8[*rid * W]);
//cout << "nextUnsignedColValue index " << *index << " rowid " << *rid << endl;
// at this point, nextRid is the index to return, and index is...
// if RIDs are not specified, nextRid + 1,
@ -1540,11 +1540,6 @@ inline void p_Col_ridArray(NewColRequestHeader* in,
using uint128_t = unsigned __int128;
using int128_t = __int128;
struct uint128_pod {
uint64_t lo;
uint64_t hi;
};
// for BINARY
template<int W>
inline void p_Col_bin_ridArray(NewColRequestHeader* in,
@ -1560,9 +1555,9 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
placeholderRegex.used = false;
//FIXME: pCol is setting it to 8192 cause logicalBlockMode is true
if(itemsPerBlk == BLOCK_SIZE){
/*if(itemsPerBlk == BLOCK_SIZE){
itemsPerBlk = BLOCK_SIZE/W;
}
}*/
if (in->NVALS > 0)
ridArray = reinterpret_cast<uint16_t*>(&in8[sizeof(NewColRequestHeader) +
@ -1588,13 +1583,13 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
{
if (isUnsigned((CalpontSystemCatalog::ColDataType)in->DataType))
{
out->Min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
out->Min = -1;
out->Max = 0;
}
else
{
out->Min = numeric_limits<int64_t>::max();
out->Max = numeric_limits<int64_t>::min();
dataconvert::DataConvert::int128Max(out->Min);
dataconvert::DataConvert::int128Min(out->Max);
}
}
else
@ -1606,7 +1601,6 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
typedef char binWtype [W];
const ColArgs* args = NULL;
int64_t val = 0;
binWtype* bval;
int nextRidIndex = 0, argIndex = 0;
bool done = false, cmp = false, isNull = false, isEmpty = false;
@ -1655,16 +1649,14 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
bval = (binWtype*)nextBinColValue<W>(in->DataType, ridArray, in->NVALS, &nextRidIndex, &done, &isNull,
&isEmpty, &rid, in->OutputType, reinterpret_cast<uint8_t*>(block), itemsPerBlk);
uint128_t val;
dataconvert::Int128Pod_t *valPod;
valPod = reinterpret_cast<dataconvert::Int128Pod_t*>(&val);
while (!done)
{
// if((*((uint64_t *) (bval))) != 0)
// {
// cout << "rid "<< rid << " value ";
// if(W > 16) printf("%016X%016X ",( *(((uint64_t *) (bval)) +3)),(*(((uint64_t *) (bval)) +2)));
// printf("%016X%016X ",( *(((uint64_t *) (bval)) +1)),(*((uint64_t *) (bval))) );
//
// cout << endl;
// }
valPod->lo = *reinterpret_cast<uint64_t*>(*bval);
valPod->hi = *(reinterpret_cast<uint64_t*>(*bval) + 1);
if (cops == NULL) // implies parsedColumnFilter && columnFilterMode == SET
{
@ -1701,13 +1693,9 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
// if((*((uint64_t *) (uval))) != 0) cout << "comparing " << dec << (*((uint64_t *) (uval))) << " to " << (*((uint64_t *) (argVals[argIndex]))) << endl;
// WIP MCOL-641
uint128_t val, filterVal;
uint128_pod *valPod, *filterValPod;
valPod = reinterpret_cast<uint128_pod*>(&val);
filterValPod = reinterpret_cast<uint128_pod*>(&filterVal);
valPod->lo = *reinterpret_cast<uint64_t*>(*bval);
valPod->hi = *(reinterpret_cast<uint64_t*>(*bval) + 1);
uint128_t filterVal;
dataconvert::Int128Pod_t *filterValPod;
filterValPod = reinterpret_cast<dataconvert::Int128Pod_t*>(&filterVal);
filterValPod->lo = *reinterpret_cast<uint64_t*>(argVals[argIndex]);
filterValPod->hi = *(reinterpret_cast<uint64_t*>(argVals[argIndex]) + 1);
@ -1767,6 +1755,35 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
}
}
// Set the min and max if necessary. Ignore nulls.
if (out->ValidMinMax && !isNull && !isEmpty)
{
if (in->DataType == CalpontSystemCatalog::CHAR || in->DataType == CalpontSystemCatalog::VARCHAR)
{
if (colCompare(out->Min, val, COMPARE_GT, false, in->DataType, W, placeholderRegex))
out->Min = val;
if (colCompare(out->Max, val, COMPARE_LT, false, in->DataType, W, placeholderRegex))
out->Max = val;
}
else if (isUnsigned((CalpontSystemCatalog::ColDataType)in->DataType))
{
if (static_cast<unsigned __int128>(out->Min) > val)
out->Min = static_cast<unsigned __int128>(val);
if (static_cast<unsigned __int128>(out->Max) < val)
out->Max = static_cast<unsigned __int128>(val);;
}
else
{
if (out->Min > (__int128) val)
out->Min = (__int128) val;
if (out->Max < (__int128) val)
out->Max = (__int128) val;
}
}
bval = (binWtype*)nextBinColValue<W>(in->DataType, ridArray, in->NVALS, &nextRidIndex, &done, &isNull,
&isEmpty, &rid, in->OutputType, reinterpret_cast<uint8_t*>(block), itemsPerBlk);

View File

@ -111,6 +111,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
minVal(MAX64),
maxVal(MIN64),
lbidForCP(0),
hasBinaryColumn(false),
busyLoaderCount(0),
physIO(0),
cachedIO(0),
@ -155,6 +156,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
minVal(MAX64),
maxVal(MIN64),
lbidForCP(0),
hasBinaryColumn(false),
busyLoaderCount(0),
physIO(0),
cachedIO(0),
@ -574,6 +576,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
/* skip the header */
bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t));
// TODO MCOL-641
bs >> count;
bs >> startPos;
@ -1082,8 +1085,16 @@ void BatchPrimitiveProcessor::initProcessor()
fAggregator->setInputOutput(fe2 ? fe2Output : outputRG, &fAggregateRG);
}
if (!hasBinaryColumn)
{
minVal = MAX64;
maxVal = MIN64;
}
else
{
dataconvert::DataConvert::int128Min(bigMaxVal);
dataconvert::DataConvert::int128Max(bigMinVal);
}
// @bug 1269, initialize data used by execute() for async loading blocks
// +1 for the scan filter step with no predicate, if any
@ -1970,9 +1981,19 @@ void BatchPrimitiveProcessor::writeProjectionPreamble()
{
*serialized << (uint8_t) 1;
*serialized << lbidForCP;
if (hasBinaryColumn)
{
*serialized << (uint8_t) 16; // width of min/max value
*serialized << (unsigned __int128) bigMinVal;
*serialized << (unsigned __int128) bigMaxVal;
}
else
{
*serialized << (uint8_t) 8; // width of min/max value
*serialized << (uint64_t) minVal;
*serialized << (uint64_t) maxVal;
}
}
else
{
*serialized << (uint8_t) 0;
@ -2060,9 +2081,19 @@ void BatchPrimitiveProcessor::makeResponse()
{
*serialized << (uint8_t) 1;
*serialized << lbidForCP;
if (hasBinaryColumn)
{
*serialized << (uint8_t) 16; // width of min/max value
*serialized << (unsigned __int128) bigMinVal;
*serialized << (unsigned __int128) bigMaxVal;
}
else
{
*serialized << (uint8_t) 8; // width of min/max value
*serialized << (uint64_t) minVal;
*serialized << (uint64_t) maxVal;
}
}
else
{
*serialized << (uint8_t) 0;
@ -2164,8 +2195,16 @@ int BatchPrimitiveProcessor::operator()()
}
allocLargeBuffers();
if (!hasBinaryColumn)
{
minVal = MAX64;
maxVal = MIN64;
}
else
{
dataconvert::DataConvert::int128Min(bigMaxVal);
dataconvert::DataConvert::int128Max(bigMinVal);
}
validCPData = false;
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute");

View File

@ -211,8 +211,8 @@ private:
bool needStrValues;
/* Common space for primitive data */
static const uint32_t BUFFER_SIZE = 65536;
uint8_t blockData[BLOCK_SIZE * 8];
static const uint32_t BUFFER_SIZE = 131072;
uint8_t blockData[BLOCK_SIZE * 16];
boost::scoped_array<uint8_t> outputMsg;
uint32_t outMsgSize;
@ -228,9 +228,21 @@ private:
bool hasScan;
bool validCPData;
int64_t minVal, maxVal; // CP data from a scanned column
uint64_t lbidForCP;
// CP data from a scanned column
union
{
__int128 bigMinVal;
int64_t minVal;
};
union
{
__int128 bigMaxVal;
int64_t maxVal;
};
uint64_t lbidForCP;
// MCOL-641
bool hasBinaryColumn;
// IO counters
boost::mutex counterLock;
uint32_t busyLoaderCount;

View File

@ -206,6 +206,12 @@ void ColumnCommand::loadData()
ByteStream::octbyte o = getEmptyRowValue(colType.colDataType, colType.colWidth);
oPtr[idx] = o;
}
else if (colType.colWidth == 16)
{
uint64_t *ptr = reinterpret_cast<uint64_t*>(&bpp->blockData[i * BLOCK_SIZE] + (idx*16) );
*ptr = joblist::BINARYEMPTYROW;
*(ptr + 1) = joblist::BINARYEMPTYROW;
}
}
}// else
@ -256,8 +262,17 @@ void ColumnCommand::issuePrimitive()
//if (wasVersioned && outMsg->ValidMinMax)
// cout << "CC: versioning overriding min max data\n";
bpp->lbidForCP = lbid;
bpp->maxVal = outMsg->Max;
bpp->minVal = outMsg->Min;
if (primMsg->DataSize > 8)
{
bpp->hasBinaryColumn = true;
bpp->bigMaxVal = outMsg->Max;
bpp->bigMinVal = outMsg->Min;
}
else
{
bpp->maxVal = static_cast<int64_t>(outMsg->Max);
bpp->minVal = static_cast<int64_t>(outMsg->Min);
}
}
} // issuePrimitive()

View File

@ -408,11 +408,31 @@ int clearAllCPData()
if (err == 0 && ranges.size() > 0)
{
// Get the extents for a given OID to determine it's column width
std::vector<struct EMEntry> entries;
CHECK(emp->getExtents(oid, entries, false, false, true));
if (entries.empty())
continue;
bool isBinaryColumn = entries[0].colWid > 8;
BRM::CPInfo cpInfo;
BRM::CPInfoList_t vCpInfo;
if (!isBinaryColumn)
{
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
else
{
dataconvert::DataConvert::int128Min(cpInfo.bigMax);
dataconvert::DataConvert::int128Max(cpInfo.bigMin);
}
cpInfo.seqNum = -1;
cpInfo.isBinaryColumn = isBinaryColumn;
for (uint32_t i = 0; i < ranges.size(); i++)
{
@ -433,17 +453,38 @@ int clearAllCPData()
//------------------------------------------------------------------------------
int clearmm(OID_t oid)
{
BRM::LBIDRange_v ranges;
CHECK(emp->lookup(oid, ranges));
BRM::LBIDRange_v::size_type rcount = ranges.size();
// Get the extents for a given OID to determine it's column width
std::vector<struct EMEntry> entries;
CHECK(emp->getExtents(oid, entries, false, false, true));
if (entries.empty())
{
cerr << "There are no entries in the Extent Map for OID: " << oid << endl;
return 1;
}
bool isBinaryColumn = entries[0].colWid > 8;
// @bug 2280. Changed to use the batch interface to clear the CP info to make the clear option faster.
BRM::CPInfo cpInfo;
BRM::CPInfoList_t vCpInfo;
if (!isBinaryColumn)
{
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
else
{
dataconvert::DataConvert::int128Min(cpInfo.bigMax);
dataconvert::DataConvert::int128Max(cpInfo.bigMin);
}
cpInfo.seqNum = -1;
cpInfo.isBinaryColumn = isBinaryColumn;
for (unsigned i = 0; i < rcount; i++)
{

View File

@ -1163,22 +1163,12 @@ bool stringToTimestampStruct(const string& data, TimeStamp& timeStamp, const str
}
// WIP MCOL-641
#include <stdio.h>
struct uint128_pod
{
uint64_t lo;
uint64_t hi;
};
// WIP MCOL-641
// Check for overflows with buflen
template<typename T>
void DataConvert::toString(T* dec, char *p, size_t buflen)
{
uint64_t div = 10000000000000000000ULL;
size_t div_log = 19;
// template this
uint128_t high = *dec;
uint128_t low;
@ -1192,23 +1182,27 @@ void DataConvert::toString(T* dec, char *p, size_t buflen)
// use typeof
// Or a templated structure
// Use uint64* to access parts of uint128 and remove pods
uint128_pod *high_pod = reinterpret_cast<uint128_pod*>(&high);
uint128_pod *mid_pod = reinterpret_cast<uint128_pod*>(&mid);
uint128_pod *low_pod = reinterpret_cast<uint128_pod*>(&low);
Int128Pod_t *high_pod = reinterpret_cast<Int128Pod_t*>(&high);
Int128Pod_t *mid_pod = reinterpret_cast<Int128Pod_t*>(&mid);
Int128Pod_t *low_pod = reinterpret_cast<Int128Pod_t*>(&low);
char* original_p = p;
int printed_chars = 0;
// WIP replace snprintf with streams
if (high_pod->lo != 0) {
printed_chars = snprintf(p, div_log+1, "%lu", high_pod->lo);
printed_chars = sprintf(p, "%lu", high_pod->lo);
p += printed_chars;
printed_chars = snprintf(p, div_log+1, "%019lu", mid_pod->lo);
printed_chars = sprintf(p, "%019lu", mid_pod->lo);
p += printed_chars;
sprintf(p, "%019lu", low_pod->lo);
} else if (mid_pod->lo != 0) {
printed_chars = snprintf(p, div_log+1, "%lu", mid_pod->lo);
printed_chars = sprintf(p, "%lu", mid_pod->lo);
p += printed_chars;
sprintf(p, "%019lu", low_pod->lo);
}
else {
sprintf(p, "%lu", low_pod->lo);
}
snprintf(p, div_log+1, "%019lu", low_pod->lo);
if (buflen <= p-original_p)
std::cout << "DataConvert::toString char buffer overflow" << std::endl;
}

View File

@ -134,9 +134,23 @@ const int32_t MIN_TIMESTAMP_VALUE = 0;
namespace dataconvert
{
// Decimal has maximum 38 digits with 3 extra chars for dot(.), minus(-), null character(\0)
const int MAX_DECIMAL_STRING_LENGTH = 41;
// WIP MCOL-641
using int128_t = __int128;
using uint128_t = unsigned __int128;
struct Int128Pod_struct
{
uint64_t lo;
uint64_t hi;
};
typedef Int128Pod_struct Int128Pod_t;
void atoi128(const std::string& arg, int128_t& res);
enum CalpontDateTimeFormat
{
CALPONTDATE_ENUM = 1, // date format is: "YYYY-MM-DD"
@ -1022,6 +1036,28 @@ public:
template <typename T>
EXPORT static void toString(T* dec, char *p, size_t buflen);
static inline void int128Max(int128_t& i)
{
Int128Pod_t *pod = reinterpret_cast<Int128Pod_t*>(&i);
pod->lo = 0xFFFFFFFFFFFFFFFF;
pod->hi = 0x7FFFFFFFFFFFFFFF;
}
static inline void int128Min(int128_t& i)
{
Int128Pod_t *pod = reinterpret_cast<Int128Pod_t*>(&i);
pod->lo = 0;
pod->hi = 0x8000000000000000;
}
static inline void uint128Max(uint128_t& i)
{
Int128Pod_t *pod = reinterpret_cast<Int128Pod_t*>(&i);
pod->lo = 0xFFFFFFFFFFFFFFFF;
pod->hi = 0xFFFFFFFFFFFFFFFF;
}
static inline std::string constructRegexp(const std::string& str);
static inline void trimWhitespace(int64_t& charData);
static inline bool isEscapedChar(char c)

View File

@ -151,6 +151,17 @@ struct CPInfo
int64_t max;
int64_t min;
int32_t seqNum;
union
{
__int128 bigMax;
int64_t max_;
};
union
{
__int128 bigMin;
int64_t min_;
};
bool isBinaryColumn;
};
typedef std::vector<CPInfo> CPInfoList_t;
@ -160,6 +171,17 @@ struct CPMaxMin
int64_t max;
int64_t min;
int32_t seqNum;
union
{
__int128 bigMax;
int64_t max_;
};
union
{
__int128 bigMin;
int64_t min_;
};
bool isBinaryColumn;
};
typedef std::tr1::unordered_map<LBID_t, CPMaxMin> CPMaxMinMap_t;
@ -172,7 +194,18 @@ struct CPInfoMerge
int64_t min; // min value to be merged with current min value
int32_t seqNum; // sequence number (not currently used)
execplan::CalpontSystemCatalog::ColDataType type;
int32_t colWidth;
bool newExtent; // is this to be treated as a new extent
union
{
__int128 bigMax;
int64_t max_;
};
union
{
__int128 bigMin;
int64_t min_;
};
};
typedef std::vector<CPInfoMerge> CPInfoMergeList_t;
@ -184,7 +217,18 @@ struct CPMaxMinMerge
int64_t min;
int32_t seqNum;
execplan::CalpontSystemCatalog::ColDataType type;
int32_t colWidth;
bool newExtent;
union
{
__int128 bigMax;
int64_t max_;
};
union
{
__int128 bigMin;
int64_t min_;
};
};
typedef std::tr1::unordered_map<LBID_t, CPMaxMinMerge> CPMaxMinMergeMap_t;

View File

@ -30,6 +30,7 @@
//#define NDEBUG
#include <cassert>
#include "dataconvert.h"
#include "oamcache.h"
#include "rwlock.h"
#include "mastersegmenttable.h"
@ -461,7 +462,8 @@ int DBRM::markExtentsInvalid(const vector<LBID_t>& lbids,
return err;
}
int DBRM::getExtentMaxMin(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw()
template <typename T>
int DBRM::getExtentMaxMin(const LBID_t lbid, T& max, T& min, int32_t& seqNum) throw()
{
#ifdef BRM_INFO
@ -556,7 +558,14 @@ int DBRM::setExtentsMaxMin(const CPInfoList_t& cpInfos) DBRM_THROW
for (it = cpInfos.begin(); it != cpInfos.end(); it++)
{
command << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum;
if (it->isBinaryColumn)
{
command << (uint8_t)1 << (uint64_t)it->firstLbid << (unsigned __int128)it->bigMax << (unsigned __int128)it->bigMin << (uint32_t)it->seqNum;
}
else
{
command << (uint8_t)0 << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum;
}
}
err = send_recv(command, response);
@ -4526,7 +4535,12 @@ void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN
// lookup the column oid for that lbid (all we care about is oid here)
if (em->lookupLocal(lbid, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset) == 0)
{
if (execplan::isUnsigned(csc->colType(oid).colDataType))
execplan::CalpontSystemCatalog::ColType colType = csc->colType(oid);
bool isBinaryColumn = colType.colWidth > 8;
aInfo.isBinaryColumn = isBinaryColumn;
if (!isBinaryColumn)
{
if (execplan::isUnsigned(colType.colDataType))
{
aInfo.max = 0;
aInfo.min = numeric_limits<uint64_t>::max();
@ -4538,10 +4552,26 @@ void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN
}
}
else
{
if (execplan::isUnsigned(colType.colDataType))
{
aInfo.bigMax = 0;
aInfo.bigMin = -1;
}
else
{
dataconvert::DataConvert::int128Min(aInfo.bigMax);
dataconvert::DataConvert::int128Max(aInfo.bigMin);
}
}
}
else
{
// We have a problem, but we need to put something in. This should never happen.
aInfo.max = numeric_limits<int64_t>::min();
aInfo.min = numeric_limits<int64_t>::max();
// MCOL-641 is this correct?
aInfo.isBinaryColumn = false;
}
aInfo.seqNum = -2;
@ -4552,4 +4582,10 @@ void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN
setExtentsMaxMin(cpInfos);
}
template
int DBRM::getExtentMaxMin<__int128>(const LBID_t lbid, __int128& max, __int128& min, int32_t& seqNum) throw();
template
int DBRM::getExtentMaxMin<int64_t>(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw();
} //namespace

View File

@ -822,7 +822,8 @@ public:
execplan::CalpontSystemCatalog::ColDataType colDataType) DBRM_THROW;
EXPORT int markExtentsInvalid(const std::vector<LBID_t>& lbids,
const std::vector<execplan::CalpontSystemCatalog::ColDataType>& colDataTypes) DBRM_THROW;
EXPORT int getExtentMaxMin(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw();
template <typename T>
EXPORT int getExtentMaxMin(const LBID_t lbid, T& max, T& min, int32_t& seqNum) throw();
EXPORT int setExtentMaxMin(const LBID_t lbid, const int64_t max, const int64_t min, const int32_t seqNum) DBRM_THROW;

View File

@ -116,8 +116,10 @@ namespace BRM
EMCasualPartition_struct::EMCasualPartition_struct()
{
lo_val = numeric_limits<int64_t>::min();
hi_val = numeric_limits<int64_t>::max();
lo_val = numeric_limits<int64_t>::max();
hi_val = numeric_limits<int64_t>::min();
dataconvert::DataConvert::int128Max(bigLoVal);
dataconvert::DataConvert::int128Min(bigHiVal);
sequenceNum = 0;
isValid = CP_INVALID;
}
@ -130,10 +132,20 @@ EMCasualPartition_struct::EMCasualPartition_struct(const int64_t lo, const int64
isValid = CP_INVALID;
}
EMCasualPartition_struct::EMCasualPartition_struct(const __int128 bigLo, const __int128 bigHi, const int32_t seqNum)
{
bigLoVal = bigLo;
bigHiVal = bigHi;
sequenceNum = seqNum;
isValid = CP_INVALID;
}
EMCasualPartition_struct::EMCasualPartition_struct(const EMCasualPartition_struct& em)
{
lo_val = em.lo_val;
hi_val = em.hi_val;
bigLoVal = em.bigLoVal;
bigHiVal = em.bigHiVal;
sequenceNum = em.sequenceNum;
isValid = em.isValid;
}
@ -142,6 +154,8 @@ EMCasualPartition_struct& EMCasualPartition_struct::operator= (const EMCasualPar
{
lo_val = em.lo_val;
hi_val = em.hi_val;
bigLoVal = em.bigLoVal;
bigHiVal = em.bigHiVal;
sequenceNum = em.sequenceNum;
isValid = em.isValid;
return *this;
@ -339,11 +353,15 @@ int ExtentMap::_markInvalid(const LBID_t lbid, const execplan::CalpontSystemCata
{
fExtentMap[i].partition.cprange.lo_val = numeric_limits<uint64_t>::max();
fExtentMap[i].partition.cprange.hi_val = 0;
fExtentMap[i].partition.cprange.bigLoVal = -1;
fExtentMap[i].partition.cprange.bigHiVal = 0;
}
else
{
fExtentMap[i].partition.cprange.lo_val = numeric_limits<int64_t>::max();
fExtentMap[i].partition.cprange.hi_val = numeric_limits<int64_t>::min();
dataconvert::DataConvert::int128Max(fExtentMap[i].partition.cprange.bigLoVal);
dataconvert::DataConvert::int128Min(fExtentMap[i].partition.cprange.bigHiVal);
}
incSeqNum(fExtentMap[i].partition.cprange.sequenceNum);
@ -460,6 +478,8 @@ int ExtentMap::markInvalid(const vector<LBID_t>& lbids,
**/
// TODO MCOL-641 Not adding support here since this function appears to be unused anywhere.
int ExtentMap::setMaxMin(const LBID_t lbid,
const int64_t max,
const int64_t min,
@ -626,8 +646,16 @@ void ExtentMap::setExtentsMaxMin(const CPMaxMinMap_t& cpMap, bool firstNode, boo
fExtentMap[i].partition.cprange.isValid == CP_INVALID)
{
makeUndoRecord(&fExtentMap[i], sizeof(struct EMEntry));
if (it->second.isBinaryColumn)
{
fExtentMap[i].partition.cprange.bigHiVal = it->second.bigMax;
fExtentMap[i].partition.cprange.bigLoVal = it->second.bigMin;
}
else
{
fExtentMap[i].partition.cprange.hi_val = it->second.max;
fExtentMap[i].partition.cprange.lo_val = it->second.min;
}
fExtentMap[i].partition.cprange.isValid = CP_VALID;
incSeqNum(fExtentMap[i].partition.cprange.sequenceNum);
extentsUpdated++;
@ -663,8 +691,16 @@ void ExtentMap::setExtentsMaxMin(const CPMaxMinMap_t& cpMap, bool firstNode, boo
else if (it->second.seqNum == -2)
{
makeUndoRecord(&fExtentMap[i], sizeof(struct EMEntry));
if (it->second.isBinaryColumn)
{
fExtentMap[i].partition.cprange.bigHiVal = it->second.bigMax;
fExtentMap[i].partition.cprange.bigLoVal = it->second.bigMin;
}
else
{
fExtentMap[i].partition.cprange.hi_val = it->second.max;
fExtentMap[i].partition.cprange.lo_val = it->second.min;
}
fExtentMap[i].partition.cprange.isValid = CP_INVALID;
incSeqNum(fExtentMap[i].partition.cprange.sequenceNum);
extentsUpdated++;
@ -707,6 +743,7 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
{
CPMaxMinMergeMap_t::const_iterator it;
// TODO MCOL-641 Add support in the debugging outputs here.
#ifdef BRM_DEBUG
log("ExtentMap::mergeExtentsMaxMin()", logging::LOG_TYPE_DEBUG);
@ -784,13 +821,15 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
log(os.str(), logging::LOG_TYPE_DEBUG);
#endif
bool isBinaryColumn = it->second.colWidth > 8;
switch (fExtentMap[i].partition.cprange.isValid)
{
// Merge input min/max with current min/max
case CP_VALID:
{
if (!isValidCPRange( it->second.max,
it->second.min,
if (!isValidCPRange( !isBinaryColumn ? it->second.max : it->second.bigMax,
!isBinaryColumn ? it->second.min : it->second.bigMin,
it->second.type ))
{
break;
@ -804,8 +843,8 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
// min/max needs to be set instead of merged.
if (isValidCPRange(
fExtentMap[i].partition.cprange.hi_val,
fExtentMap[i].partition.cprange.lo_val,
!isBinaryColumn ? fExtentMap[i].partition.cprange.hi_val : fExtentMap[i].partition.cprange.bigHiVal,
!isBinaryColumn ? fExtentMap[i].partition.cprange.lo_val : fExtentMap[i].partition.cprange.bigLoVal,
it->second.type))
{
// Swap byte order to do binary string comparison
@ -835,6 +874,8 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
it->second.max;
}
else if (isUnsigned(it->second.type))
{
if (!isBinaryColumn)
{
if (static_cast<uint64_t>(it->second.min) <
static_cast<uint64_t>(fExtentMap[i].partition.cprange.lo_val))
@ -851,6 +892,25 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
}
}
else
{
if (static_cast<unsigned __int128>(it->second.bigMin) <
static_cast<unsigned __int128>(fExtentMap[i].partition.cprange.bigLoVal))
{
fExtentMap[i].partition.cprange.bigLoVal =
it->second.bigMin;
}
if (static_cast<unsigned __int128>(it->second.bigMax) >
static_cast<unsigned __int128>(fExtentMap[i].partition.cprange.bigHiVal))
{
fExtentMap[i].partition.cprange.bigHiVal =
it->second.bigMax;
}
}
}
else
{
if (!isBinaryColumn)
{
if (it->second.min <
fExtentMap[i].partition.cprange.lo_val)
@ -862,14 +922,37 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
fExtentMap[i].partition.cprange.hi_val =
it->second.max;
}
else
{
if (it->second.bigMin <
fExtentMap[i].partition.cprange.bigLoVal)
fExtentMap[i].partition.cprange.bigLoVal =
it->second.bigMin;
if (it->second.bigMax >
fExtentMap[i].partition.cprange.bigHiVal)
fExtentMap[i].partition.cprange.bigHiVal =
it->second.bigMax;
}
}
}
else
{
if (!isBinaryColumn)
{
fExtentMap[i].partition.cprange.lo_val =
it->second.min;
fExtentMap[i].partition.cprange.hi_val =
it->second.max;
}
else
{
fExtentMap[i].partition.cprange.bigLoVal =
it->second.bigMin;
fExtentMap[i].partition.cprange.bigHiVal =
it->second.bigMax;
}
}
incSeqNum(fExtentMap[i].partition.cprange.sequenceNum);
@ -897,15 +980,25 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
if (it->second.newExtent)
{
if (isValidCPRange( it->second.max,
it->second.min,
if (isValidCPRange( !isBinaryColumn ? it->second.max : it->second.bigMax,
!isBinaryColumn ? it->second.min : it->second.bigMin,
it->second.type ))
{
if (!isBinaryColumn)
{
fExtentMap[i].partition.cprange.lo_val =
it->second.min;
fExtentMap[i].partition.cprange.hi_val =
it->second.max;
}
else
{
fExtentMap[i].partition.cprange.bigLoVal =
it->second.bigMin;
fExtentMap[i].partition.cprange.bigHiVal =
it->second.bigMax;
}
}
// Even if invalid range; we set state to CP_VALID,
// because the extent is valid, it is just empty.
@ -939,9 +1032,12 @@ void ExtentMap::mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock)
// Range is considered invalid if min or max, are NULL (min()), or EMPTY
// (min()+1). For unsigned types NULL is max() and EMPTY is max()-1.
//------------------------------------------------------------------------------
bool ExtentMap::isValidCPRange(int64_t max, int64_t min, execplan::CalpontSystemCatalog::ColDataType type) const
template <typename T>
bool ExtentMap::isValidCPRange(const T& max, const T& min, execplan::CalpontSystemCatalog::ColDataType type) const
{
if (isUnsigned(type))
{
if (typeid(T) != typeid(__int128))
{
if ( (static_cast<uint64_t>(min) >= (numeric_limits<uint64_t>::max() - 1)) ||
(static_cast<uint64_t>(max) >= (numeric_limits<uint64_t>::max() - 1)) )
@ -950,6 +1046,20 @@ bool ExtentMap::isValidCPRange(int64_t max, int64_t min, execplan::CalpontSystem
}
}
else
{
unsigned __int128 temp;
dataconvert::DataConvert::uint128Max(temp);
if ( (static_cast<unsigned __int128>(min) >= (temp - 1)) ||
(static_cast<unsigned __int128>(max) >= (temp - 1)) )
{
return false;
}
}
}
else
{
if (typeid(T) != typeid(__int128))
{
if ( (min <= (numeric_limits<int64_t>::min() + 1)) ||
(max <= (numeric_limits<int64_t>::min() + 1)) )
@ -957,6 +1067,18 @@ bool ExtentMap::isValidCPRange(int64_t max, int64_t min, execplan::CalpontSystem
return false;
}
}
else
{
__int128 temp;
dataconvert::DataConvert::int128Min(temp);
if ( (min <= (temp + 1)) ||
(max <= (temp + 1)) )
{
return false;
}
}
}
return true;
}
@ -969,9 +1091,9 @@ bool ExtentMap::isValidCPRange(int64_t max, int64_t min, execplan::CalpontSystem
* return the sequenceNum of the extent and the max/min values as -1.
**/
template <typename T>
int ExtentMap::getMaxMin(const LBID_t lbid,
int64_t& max,
int64_t& min,
T& max, T& min,
int32_t& seqNum)
{
#ifdef BRM_INFO
@ -987,8 +1109,19 @@ int ExtentMap::getMaxMin(const LBID_t lbid,
}
#endif
max = numeric_limits<uint64_t>::max();
min = 0;
if (typeid(T) == typeid(__int128))
{
__int128 tmpMax, tmpMin;
dataconvert::DataConvert::int128Min(tmpMax);
dataconvert::DataConvert::int128Max(tmpMin);
max = tmpMax;
min = tmpMin;
}
else
{
max = numeric_limits<int64_t>::min();
min = numeric_limits<int64_t>::max();
}
seqNum *= (-1);
int entries;
int i;
@ -1013,9 +1146,17 @@ int ExtentMap::getMaxMin(const LBID_t lbid,
(static_cast<LBID_t>(fExtentMap[i].range.size) * 1024) - 1;
if (lbid >= fExtentMap[i].range.start && lbid <= lastBlock)
{
if (typeid(T) == typeid(__int128))
{
max = fExtentMap[i].partition.cprange.bigHiVal;
min = fExtentMap[i].partition.cprange.bigLoVal;
}
else
{
max = fExtentMap[i].partition.cprange.hi_val;
min = fExtentMap[i].partition.cprange.lo_val;
}
seqNum = fExtentMap[i].partition.cprange.sequenceNum;
isValid = fExtentMap[i].partition.cprange.isValid;
releaseEMEntryTable(READ);
@ -2568,15 +2709,31 @@ LBID_t ExtentMap::_createColumnExtent_DBroot(uint32_t size, int OID,
e->fileID = OID;
if (isUnsigned(colDataType))
{
if (colWidth <= 8)
{
e->partition.cprange.lo_val = numeric_limits<uint64_t>::max();
e->partition.cprange.hi_val = 0;
}
else
{
e->partition.cprange.bigLoVal = -1;
e->partition.cprange.bigHiVal = 0;
}
}
else
{
if (colWidth <= 8)
{
e->partition.cprange.lo_val = numeric_limits<int64_t>::max();
e->partition.cprange.hi_val = numeric_limits<int64_t>::min();
}
else
{
dataconvert::DataConvert::int128Max(e->partition.cprange.bigLoVal);
dataconvert::DataConvert::int128Min(e->partition.cprange.bigHiVal);
}
}
e->partition.cprange.sequenceNum = 0;
@ -2763,15 +2920,31 @@ LBID_t ExtentMap::_createColumnExtentExactFile(uint32_t size, int OID,
e->fileID = OID;
if (isUnsigned(colDataType))
{
if (colWidth <= 8)
{
e->partition.cprange.lo_val = numeric_limits<uint64_t>::max();
e->partition.cprange.hi_val = 0;
}
else
{
e->partition.cprange.bigLoVal = -1;
e->partition.cprange.bigHiVal = 0;
}
}
else
{
if (colWidth <= 8)
{
e->partition.cprange.lo_val = numeric_limits<int64_t>::max();
e->partition.cprange.hi_val = numeric_limits<int64_t>::min();
}
else
{
dataconvert::DataConvert::int128Max(e->partition.cprange.bigLoVal);
dataconvert::DataConvert::int128Min(e->partition.cprange.bigHiVal);
}
}
e->partition.cprange.sequenceNum = 0;
@ -2955,6 +3128,8 @@ LBID_t ExtentMap::_createDictStoreExtent(uint32_t size, int OID,
e->status = EXTENTUNAVAILABLE;// @bug 1911 mark extent as in process
e->partition.cprange.lo_val = numeric_limits<int64_t>::max();
e->partition.cprange.hi_val = numeric_limits<int64_t>::min();
dataconvert::DataConvert::int128Max(e->partition.cprange.bigLoVal);
dataconvert::DataConvert::int128Min(e->partition.cprange.bigHiVal);
e->partition.cprange.sequenceNum = 0;
e->partition.cprange.isValid = CP_INVALID;
@ -5840,6 +6015,12 @@ void ExtentMap::dumpTo(ostream& os)
}
*/
template
int ExtentMap::getMaxMin<__int128>(const LBID_t lbidRange, __int128& max, __int128& min, int32_t& seqNum);
template
int ExtentMap::getMaxMin<int64_t>(const LBID_t lbidRange, int64_t& max, int64_t& min, int32_t& seqNum);
} //namespace
// vim:ts=4 sw=4:

View File

@ -108,8 +108,19 @@ struct EMCasualPartition_struct
char isValid; //CP_INVALID - No min/max and no DML in progress. CP_UPDATING - Update in progress. CP_VALID- min/max is valid
EXPORT EMCasualPartition_struct();
EXPORT EMCasualPartition_struct(const int64_t lo, const int64_t hi, const int32_t seqNum);
EXPORT EMCasualPartition_struct(const __int128 bigLo, const __int128 bigHi, const int32_t seqNum);
EXPORT EMCasualPartition_struct(const EMCasualPartition_struct& em);
EXPORT EMCasualPartition_struct& operator= (const EMCasualPartition_struct& em);
union
{
__int128 bigLoVal;
int64_t loVal;
};
union
{
__int128 bigHiVal;
int64_t hiVal;
};
};
typedef EMCasualPartition_struct EMCasualPartition_t;
@ -854,7 +865,8 @@ public:
*/
void mergeExtentsMaxMin(CPMaxMinMergeMap_t& cpMap, bool useLock = true);
EXPORT int getMaxMin(const LBID_t lbidRange, int64_t& max, int64_t& min, int32_t& seqNum);
template <typename T>
EXPORT int getMaxMin(const LBID_t lbidRange, T& max, T& min, int32_t& seqNum);
inline bool empty()
{
@ -934,7 +946,8 @@ private:
uint16_t dbRoot,
uint32_t partitionNum,
uint16_t segmentNum);
bool isValidCPRange(int64_t max, int64_t min, execplan::CalpontSystemCatalog::ColDataType type) const;
template <typename T>
bool isValidCPRange(const T& max, const T& min, execplan::CalpontSystemCatalog::ColDataType type) const;
void deleteExtent(int emIndex);
LBID_t getLBIDsFromFreeList(uint32_t size);
void reserveLBIDRange(LBID_t start, uint8_t size); // used by load() to allocate pre-existing LBIDs

View File

@ -1329,9 +1329,12 @@ void SlaveComm::do_setExtentsMaxMin(ByteStream& msg)
LBID_t lbid;
uint64_t tmp64;
uint32_t tmp32;
uint8_t tmp8;
unsigned __int128 tmp128;
int err;
ByteStream reply;
int32_t updateCount;
bool isBinaryColumn = false;
#ifdef BRM_VERBOSE
cerr << "WorkerComm: do_setExtentsMaxMin()" << endl;
@ -1348,14 +1351,30 @@ void SlaveComm::do_setExtentsMaxMin(ByteStream& msg)
// Loop through extents and add each one to a map.
for (int64_t i = 0; i < updateCount; i++)
{
msg >> tmp8;
isBinaryColumn = (tmp8 != 0);
msg >> tmp64;
lbid = tmp64;
cpMaxMin.isBinaryColumn = isBinaryColumn;
if (isBinaryColumn)
{
msg >> tmp128;
cpMaxMin.bigMax = tmp128;
msg >> tmp128;
cpMaxMin.bigMin = tmp128;
}
else
{
msg >> tmp64;
cpMaxMin.max = tmp64;
msg >> tmp64;
cpMaxMin.min = tmp64;
}
msg >> tmp32;
cpMaxMin.seqNum = tmp32;

View File

@ -412,9 +412,18 @@ int SlaveDBRMNode::bulkSetHWMAndCP(const vector<BulkSetHWMArg>& hwmArgs,
if (setCPDataArgs.size() > 0)
{
for (i = 0; i < setCPDataArgs.size(); i++)
{
setCPEntry.isBinaryColumn = setCPDataArgs[i].isBinaryColumn;
if (!setCPEntry.isBinaryColumn)
{
setCPEntry.max = setCPDataArgs[i].max;
setCPEntry.min = setCPDataArgs[i].min;
}
else
{
setCPEntry.bigMax = setCPDataArgs[i].bigMax;
setCPEntry.bigMin = setCPDataArgs[i].bigMin;
}
setCPEntry.seqNum = setCPDataArgs[i].seqNum;
bulkSetCPMap[setCPDataArgs[i].firstLbid] = setCPEntry;
}
@ -428,8 +437,17 @@ int SlaveDBRMNode::bulkSetHWMAndCP(const vector<BulkSetHWMArg>& hwmArgs,
for (i = 0; i < mergeCPDataArgs.size(); i++)
{
mergeCPEntry.type = mergeCPDataArgs[i].type;
mergeCPEntry.colWidth = mergeCPDataArgs[i].colWidth;
if (mergeCPDataArgs[i].colWidth <= 8)
{
mergeCPEntry.max = mergeCPDataArgs[i].max;
mergeCPEntry.min = mergeCPDataArgs[i].min;
}
else
{
mergeCPEntry.bigMax = mergeCPDataArgs[i].bigMax;
mergeCPEntry.bigMin = mergeCPDataArgs[i].bigMin;
}
mergeCPEntry.newExtent = mergeCPDataArgs[i].newExtent;
mergeCPEntry.seqNum = mergeCPDataArgs[i].seqNum;
bulkMergeCPMap[mergeCPDataArgs[i].startLbid] = mergeCPEntry;

View File

@ -294,6 +294,7 @@ void BRMReporter::sendHWMToFile( )
//------------------------------------------------------------------------------
// Send Casual Partition update information to BRM
//------------------------------------------------------------------------------
// TODO MCOL-641
void BRMReporter::sendCPToFile( )
{
if (fCPInfo.size() > 0)

View File

@ -301,6 +301,7 @@ void BulkLoadBuffer::convert(char* field, int fieldLength,
int32_t iDate;
char charTmpBuf[MAX_COLUMN_BOUNDARY + 1] = {0};
long long llVal = 0, llDate = 0;
__int128 bigllVal = 0;
uint64_t tmp64;
uint32_t tmp32;
uint8_t ubiVal;
@ -938,7 +939,9 @@ void BulkLoadBuffer::convert(char* field, int fieldLength,
// BIG INT
//----------------------------------------------------------------------
case WriteEngine::WR_LONGLONG:
case WriteEngine::WR_BINARY:
{
// TODO MCOL-641 Add full support here.
bool bSatVal = false;
if ( column.dataType != CalpontSystemCatalog::DATETIME &&
@ -982,12 +985,19 @@ void BulkLoadBuffer::convert(char* field, int fieldLength,
if ( (column.dataType == CalpontSystemCatalog::DECIMAL) ||
(column.dataType == CalpontSystemCatalog::UDECIMAL))
{
if (width <= 8)
{
// errno is initialized and set in convertDecimalString
llVal = Convertor::convertDecimalString(
field, fieldLength, column.scale );
}
else
{
dataconvert::atoi128(string(field), bigllVal);
}
}
else
{
errno = 0;
llVal = strtoll( field, 0, 10 );
@ -1016,6 +1026,8 @@ void BulkLoadBuffer::convert(char* field, int fieldLength,
bufStats.satCount++;
// Update min/max range
if (width <= 8)
{
if (llVal < bufStats.minBufferVal)
bufStats.minBufferVal = llVal;
@ -1024,6 +1036,17 @@ void BulkLoadBuffer::convert(char* field, int fieldLength,
pVal = &llVal;
}
else
{
if (bigllVal < bufStats.bigMinBufferVal)
bufStats.bigMinBufferVal = bigllVal;
if (bigllVal > bufStats.bigMaxBufferVal)
bufStats.bigMaxBufferVal = bigllVal;
pVal = &bigllVal;
}
}
else if (column.dataType == CalpontSystemCatalog::TIME)
{
// time conversion
@ -1650,12 +1673,25 @@ int BulkLoadBuffer::parseCol(ColumnInfo& columnInfo)
// Update CP min/max if this is last row in this extent
if ( (fStartRowParser + i) == lastInputRowInExtent )
{
if (columnInfo.column.width <= 8)
{
columnInfo.updateCPInfo( lastInputRowInExtent,
bufStats.minBufferVal,
bufStats.maxBufferVal,
columnInfo.column.dataType );
columnInfo.column.dataType,
columnInfo.column.width );
}
else
{
columnInfo.updateCPInfo( lastInputRowInExtent,
bufStats.bigMinBufferVal,
bufStats.bigMaxBufferVal,
columnInfo.column.dataType,
columnInfo.column.width );
}
// TODO MCOL-641 Add support here.
if (fLog->isDebug( DEBUG_2 ))
{
ostringstream oss;
@ -1674,26 +1710,54 @@ int BulkLoadBuffer::parseCol(ColumnInfo& columnInfo)
lastInputRowInExtent += columnInfo.rowsPerExtent();
if (isUnsigned(columnInfo.column.dataType) || isCharType(columnInfo.column.dataType))
{
if (columnInfo.column.width <= 8)
{
bufStats.minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
bufStats.maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
}
else
{
bufStats.bigMinBufferVal = -1;
bufStats.bigMaxBufferVal = 0;
}
updateCPInfoPendingFlag = false;
}
else
{
if (columnInfo.column.width <= 8)
{
bufStats.minBufferVal = MAX_BIGINT;
bufStats.maxBufferVal = MIN_BIGINT;
}
else
{
dataconvert::DataConvert::int128Max(bufStats.bigMinBufferVal);
dataconvert::DataConvert::int128Min(bufStats.bigMaxBufferVal);
}
updateCPInfoPendingFlag = false;
}
}
}
if (updateCPInfoPendingFlag)
{
if (columnInfo.column.width <= 8)
{
columnInfo.updateCPInfo( lastInputRowInExtent,
bufStats.minBufferVal,
bufStats.maxBufferVal,
columnInfo.column.dataType );
columnInfo.column.dataType,
columnInfo.column.width );
}
else
{
columnInfo.updateCPInfo( lastInputRowInExtent,
bufStats.bigMinBufferVal,
bufStats.bigMaxBufferVal,
columnInfo.column.dataType,
columnInfo.column.width );
}
}
if (bufStats.satCount) // @bug 3504: increment row saturation count
@ -1724,6 +1788,7 @@ int BulkLoadBuffer::parseCol(ColumnInfo& columnInfo)
Stats::stopParseEvent(WE_STATS_PARSE_COL);
#endif
// TODO MCOL-641 Add support here.
if (fLog->isDebug( DEBUG_2 ))
{
ostringstream oss;

View File

@ -30,6 +30,7 @@
#include "boost/ptr_container/ptr_vector.hpp"
#include "we_columninfo.h"
#include "calpontsystemcatalog.h"
#include "dataconvert.h"
namespace WriteEngine
{
@ -42,17 +43,31 @@ public:
int64_t minBufferVal;
int64_t maxBufferVal;
int64_t satCount;
union
{
__int128 bigMinBufferVal;
int64_t minBufferVal_;
};
union
{
__int128 bigMaxBufferVal;
int64_t maxBufferVal_;
};
BLBufferStats(ColDataType colDataType) : satCount(0)
{
if (isUnsigned(colDataType) || isCharType(colDataType))
{
minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
bigMinBufferVal = -1;
bigMaxBufferVal = 0;
}
else
{
minBufferVal = MAX_BIGINT;
maxBufferVal = MIN_BIGINT;
dataconvert::DataConvert::int128Max(bigMinBufferVal);
dataconvert::DataConvert::int128Min(bigMaxBufferVal);
}
}
};

View File

@ -70,10 +70,11 @@ void ColExtInf::addFirstEntry( RID lastInputRow,
// buffer is flushed), so we will not have an LBID for the 1st buffer for this
// extent.
//------------------------------------------------------------------------------
void ColExtInf::addOrUpdateEntry( RID lastInputRow,
int64_t minVal,
int64_t maxVal,
ColDataType colDataType )
template <typename T>
void ColExtInf::addOrUpdateEntryTemplate( RID lastInputRow,
T minVal, T maxVal,
ColDataType colDataType,
int width )
{
boost::mutex::scoped_lock lock(fMapMutex);
@ -91,14 +92,27 @@ void ColExtInf::addOrUpdateEntry( RID lastInputRow,
// If all rows had null value for this column, then minVal will be
// MAX_INT and maxVal will be MIN_INT (see getCPInfoForBRM()).
if (iter->second.fMinVal == LLONG_MIN) // init the range
__int128 bigMinValInit;
dataconvert::DataConvert::int128Max(bigMinValInit);
if ((iter->second.fMinVal == LLONG_MIN && width <= 8) ||
(iter->second.fbigMinVal == bigMinValInit && width > 8)) // init the range
{
if (width <= 8)
{
iter->second.fMinVal = minVal;
iter->second.fMaxVal = maxVal;
}
else
{
iter->second.fbigMinVal = minVal;
iter->second.fbigMaxVal = maxVal;
}
}
else // Update the range
{
if (isUnsigned(colDataType) || isCharType(colDataType))
{
if (width <= 8)
{
if (static_cast<uint64_t>(minVal)
< static_cast<uint64_t>(iter->second.fMinVal))
@ -109,6 +123,19 @@ void ColExtInf::addOrUpdateEntry( RID lastInputRow,
iter->second.fMaxVal = maxVal;
}
else
{
if (static_cast<unsigned __int128>(minVal)
< static_cast<unsigned __int128>(iter->second.fbigMinVal))
iter->second.fbigMinVal = minVal;
if (static_cast<unsigned __int128>(maxVal)
> static_cast<unsigned __int128>(iter->second.fbigMaxVal))
iter->second.fbigMaxVal = maxVal;
}
}
else
{
if (width <= 8)
{
if (minVal < iter->second.fMinVal)
iter->second.fMinVal = minVal;
@ -116,6 +143,15 @@ void ColExtInf::addOrUpdateEntry( RID lastInputRow,
if (maxVal > iter->second.fMaxVal)
iter->second.fMaxVal = maxVal;
}
else
{
if (minVal < iter->second.fbigMinVal)
iter->second.fbigMinVal = minVal;
if (maxVal > iter->second.fbigMaxVal)
iter->second.fbigMaxVal = maxVal;
}
}
}
}
}
@ -178,6 +214,8 @@ void ColExtInf::getCPInfoForBRM( JobColumn column, BRMReporter& brmReporter )
// if applicable (indicating an extent with no non-NULL values).
int64_t minVal = iter->second.fMinVal;
int64_t maxVal = iter->second.fMaxVal;
__int128 bigMinVal = iter->second.fbigMinVal;
__int128 bigMaxVal = iter->second.fbigMaxVal;
if ( bIsChar )
{
@ -198,6 +236,7 @@ void ColExtInf::getCPInfoForBRM( JobColumn column, BRMReporter& brmReporter )
// Log for now; may control with debug flag later
//if (fLog->isDebug( DEBUG_1 ))
// TODO MCOL-641 Add support here.
{
std::ostringstream oss;
oss << "Saving CP update for OID-" << fColOid <<
@ -232,11 +271,20 @@ void ColExtInf::getCPInfoForBRM( JobColumn column, BRMReporter& brmReporter )
BRM::CPInfoMerge cpInfoMerge;
cpInfoMerge.startLbid = iter->second.fLbid;
if (column.width <= 8)
{
cpInfoMerge.max = maxVal;
cpInfoMerge.min = minVal;
}
else
{
cpInfoMerge.bigMax = bigMaxVal;
cpInfoMerge.bigMin = bigMinVal;
}
cpInfoMerge.seqNum = -1; // Not used by mergeExtentsMaxMin
cpInfoMerge.type = column.dataType;
cpInfoMerge.newExtent = iter->second.fNewExtent;
cpInfoMerge.colWidth = column.width;
brmReporter.addToCPInfo( cpInfoMerge );
++iter;

View File

@ -40,6 +40,7 @@
#include "brmtypes.h"
#include "we_type.h"
#include "dataconvert.h"
namespace WriteEngine
{
@ -62,14 +63,22 @@ public:
ColExtInfEntry() : fLbid(INVALID_LBID),
fMinVal(LLONG_MIN),
fMaxVal(LLONG_MIN),
fNewExtent(true) { }
fNewExtent(true)
{
dataconvert::DataConvert::int128Min(fbigMaxVal);
dataconvert::DataConvert::int128Max(fbigMinVal);
}
// Used to create entry for an existing extent we are going to add data to.
ColExtInfEntry(BRM::LBID_t lbid, bool bIsNewExtent) :
fLbid(lbid),
fMinVal(LLONG_MIN),
fMaxVal(LLONG_MIN),
fNewExtent(bIsNewExtent) { }
fNewExtent(bIsNewExtent)
{
dataconvert::DataConvert::int128Min(fbigMaxVal);
dataconvert::DataConvert::int128Max(fbigMinVal);
}
// Used to create entry for a new extent, with LBID not yet allocated
ColExtInfEntry(int64_t minVal, int64_t maxVal) :
@ -78,6 +87,13 @@ public:
fMaxVal(maxVal),
fNewExtent(true) { }
// Used to create entry for a new extent, with LBID not yet allocated
ColExtInfEntry(__int128 bigMinVal, __int128 bigMaxVal) :
fLbid(INVALID_LBID),
fNewExtent(true),
fbigMinVal(bigMinVal),
fbigMaxVal(bigMaxVal) { }
// Used to create entry for a new extent, with LBID not yet allocated
ColExtInfEntry(uint64_t minVal, uint64_t maxVal) :
fLbid(INVALID_LBID),
@ -85,10 +101,27 @@ public:
fMaxVal(static_cast<int64_t>(maxVal)),
fNewExtent(true) { }
// Used to create entry for a new extent, with LBID not yet allocated
ColExtInfEntry(unsigned __int128 bigMinVal, unsigned __int128 bigMaxVal) :
fLbid(INVALID_LBID),
fNewExtent(true),
fbigMinVal(static_cast<__int128>(bigMinVal)),
fbigMaxVal(static_cast<__int128>(bigMaxVal)) { }
BRM::LBID_t fLbid; // LBID for an extent; should be the starting LBID
int64_t fMinVal; // minimum value for extent associated with LBID
int64_t fMaxVal; // maximum value for extent associated with LBID
bool fNewExtent;// is this a new extent
union
{
__int128 fbigMinVal;
int64_t fMinVal_;
};
union
{
__int128 fbigMaxVal;
int64_t fMaxVal_;
};
};
//------------------------------------------------------------------------------
@ -122,7 +155,14 @@ public:
virtual void addOrUpdateEntry( RID lastInputRow,
int64_t minVal,
int64_t maxVal,
ColDataType colDataType ) { }
ColDataType colDataType,
int width ) { }
virtual void addOrUpdateEntry( RID lastInputRow,
__int128 minVal,
__int128 maxVal,
ColDataType colDataType,
int width ) { }
virtual void getCPInfoForBRM ( JobColumn column,
BRMReporter& brmReporter) { }
@ -173,10 +213,33 @@ public:
* @param minVal Minimum value for the latest buffer read
* @param maxVal Maximum value for the latest buffer read
*/
template <typename T>
void addOrUpdateEntryTemplate( RID lastInputRow,
T minVal, T maxVal,
ColDataType colDataType,
int width );
virtual void addOrUpdateEntry( RID lastInputRow,
int64_t minVal,
int64_t maxVal,
ColDataType colDataType );
int64_t minVal, int64_t maxVal,
ColDataType colDataType,
int width )
{
addOrUpdateEntryTemplate( lastInputRow,
minVal, maxVal,
colDataType,
width );
}
virtual void addOrUpdateEntry( RID lastInputRow,
__int128 minVal, __int128 maxVal,
ColDataType colDataType,
int width )
{
addOrUpdateEntryTemplate( lastInputRow,
minVal, maxVal,
colDataType,
width );
}
/** @brief Send updated Casual Partition (CP) info to BRM.
*/

View File

@ -294,10 +294,11 @@ public:
/** @brief Update extent CP information
*/
template <typename T>
void updateCPInfo( RID lastInputRow,
int64_t minVal,
int64_t maxVal,
ColDataType colDataType );
T minVal, T maxVal,
ColDataType colDataType,
int width );
/** @brief Setup initial extent we will begin loading at start of import.
* @param dbRoot DBRoot of starting extent
@ -567,13 +568,15 @@ inline unsigned ColumnInfo::rowsPerExtent()
return fRowsPerExtent;
}
template <typename T>
inline void ColumnInfo::updateCPInfo(
RID lastInputRow,
int64_t minVal,
int64_t maxVal,
ColDataType colDataType )
T minVal,
T maxVal,
ColDataType colDataType,
int width )
{
fColExtInf->addOrUpdateEntry( lastInputRow, minVal, maxVal, colDataType );
fColExtInf->addOrUpdateEntry( lastInputRow, minVal, maxVal, colDataType, width );
}
} // end of namespace

View File

@ -649,7 +649,7 @@ inline int BRMWrapper::bulkSetHWMAndCP(
inline int BRMWrapper::setExtentsMaxMin(const BRM::CPInfoList_t& cpinfoList)
{
int rc = blockRsltnMgrPtr->setExtentsMaxMin( cpinfoList );
int rc = blockRsltnMgrPtr->setExtentsMaxMin(cpinfoList);
return getRC( rc, ERR_BRM_SET_EXTENTS_CP );
}

View File

@ -39,6 +39,7 @@ using namespace std;
using namespace execplan;
#include "dataconvert.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
@ -1036,8 +1037,18 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
}
}
cpInfo.isBinaryColumn = column.colWidth > 8;
if (!cpInfo.isBinaryColumn)
{
cpInfo.max = nextValStart + nexValNeeded - 1;
cpInfo.min = nextValStart;
}
else
{
cpInfo.bigMax = nextValStart + nexValNeeded - 1;
cpInfo.bigMin = nextValStart;
}
cpInfo.seqNum = 0;
}
@ -1092,6 +1103,10 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
}
}
cpInfo.isBinaryColumn = column.colWidth > 8;
if (!cpInfo.isBinaryColumn)
{
if (isUnsigned(column.colDataType))
{
cpInfo.max = 0;
@ -1102,6 +1117,20 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
}
else
{
if (isUnsigned(column.colDataType))
{
cpInfo.bigMax = 0;
cpInfo.min = -1;
}
else
{
dataconvert::DataConvert::int128Min(cpInfo.bigMax);
dataconvert::DataConvert::int128Max(cpInfo.bigMin);
}
}
cpInfo.seqNum = -1;
}
@ -1127,17 +1156,34 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
if (autoincrement) //@Bug 4074. Mark it invalid first to set later
{
BRM::CPInfo cpInfo1;
cpInfo1.isBinaryColumn = column.colWidth > 8;
if (!cpInfo1.isBinaryColumn)
{
if (isUnsigned(column.colDataType))
{
cpInfo1.max = 0;
cpInfo1.min = static_cast<int64_t>(numeric_limits<int64_t>::max());
cpInfo1.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
}
else
{
cpInfo1.max = numeric_limits<int64_t>::min();
cpInfo1.min = numeric_limits<int64_t>::max();
}
}
else
{
if (isUnsigned(column.colDataType))
{
cpInfo1.bigMax = 0;
cpInfo1.bigMin = -1;
}
else
{
dataconvert::DataConvert::int128Min(cpInfo1.bigMax);
dataconvert::DataConvert::int128Max(cpInfo1.bigMin);
}
}
cpInfo1.seqNum = -1;
cpInfo1.firstLbid = startLbid;
@ -1152,13 +1198,11 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
BRM::CPInfoList_t cpinfoList;
cpInfo.firstLbid = startLbid;
cpinfoList.push_back(cpInfo);
//cout << "calling setExtentsMaxMin for startLbid = " << startLbid << endl;
rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
if ( rc != NO_ERROR)
return rc;
//cout << "calling setLocalHWM for oid:hwm = " << column.dataFile.fid <<":"<<colHwm << endl;
rc = BRMWrapper::getInstance()->setLocalHWM((OID)column.dataFile.fid, column.dataFile.fPartition,
column.dataFile.fSegment, colHwm);

View File

@ -57,6 +57,7 @@ using namespace execplan;
#include "IDBPolicy.h"
#include "MonitorProcMem.h"
using namespace idbdatafile;
#include "dataconvert.h"
#ifdef _MSC_VER
#define isnan _isnan
@ -1536,19 +1537,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
BRM::CPInfoList_t cpinfoList;
BRM::CPInfo cpInfo;
if (isUnsigned(colStructList[i].colDataType))
{
cpInfo.max = 0;
cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
}
else
{
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
cpInfo.seqNum = -1;
for ( i = 0; i < extents.size(); i++)
{
colOp = m_colOp[op(colStructList[i].fCompressionType)];
@ -1562,6 +1550,37 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
if (rc != NO_ERROR)
return rc;
cpInfo.isBinaryColumn = colStructList[i].colWidth > 8;
if (!cpInfo.isBinaryColumn)
{
if (isUnsigned(colStructList[i].colDataType))
{
cpInfo.max = 0;
cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
}
else
{
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
}
else
{
if (isUnsigned(colStructList[i].colDataType))
{
cpInfo.bigMax = 0;
cpInfo.bigMin = -1;
}
else
{
dataconvert::DataConvert::int128Min(cpInfo.bigMax);
dataconvert::DataConvert::int128Max(cpInfo.bigMin);
}
}
cpInfo.seqNum = -1;
//mark the extents to invalid
cpInfo.firstLbid = extents[i].startLbid;
cpinfoList.push_back(cpInfo);
@ -2278,19 +2297,6 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
BRM::CPInfoList_t cpinfoList;
BRM::CPInfo cpInfo;
if (isUnsigned(colStructList[i].colDataType))
{
cpInfo.max = 0;
cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
}
else
{
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
cpInfo.seqNum = -1;
for ( i = 0; i < extents.size(); i++)
{
colOp = m_colOp[op(colStructList[i].fCompressionType)];
@ -2304,6 +2310,37 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
if (rc != NO_ERROR)
return rc;
cpInfo.isBinaryColumn = colStructList[i].colWidth > 8;
if (!cpInfo.isBinaryColumn)
{
if (isUnsigned(colStructList[i].colDataType))
{
cpInfo.max = 0;
cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
}
else
{
cpInfo.max = numeric_limits<int64_t>::min();
cpInfo.min = numeric_limits<int64_t>::max();
}
}
else
{
if (isUnsigned(colStructList[i].colDataType))
{
cpInfo.bigMax = 0;
cpInfo.bigMin = -1;
}
else
{
dataconvert::DataConvert::int128Min(cpInfo.bigMax);
dataconvert::DataConvert::int128Max(cpInfo.bigMin);
}
}
cpInfo.seqNum = -1;
//mark the extents to invalid
cpInfo.firstLbid = extents[i].startLbid;
cpinfoList.push_back(cpInfo);