1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-3536 Collation

This commit is contained in:
David Hall
2020-06-01 15:08:15 -05:00
parent 516a3fa37e
commit 78ac310e42
14 changed files with 126 additions and 262 deletions

View File

@ -107,6 +107,7 @@ void PredicateOperator::unserialize(messageqcpp::ByteStream& b)
ObjectReader::checkType(b, ObjectReader::PREDICATEOPERATOR);
//b >> fData;
Operator::unserialize(b);
cs = get_charset(fOperationType.charsetNumber, MYF(MY_WME));
}
bool PredicateOperator::operator==(const PredicateOperator& t) const
@ -307,6 +308,9 @@ void PredicateOperator::setOpType(Type& l, Type& r)
r.colDataType == execplan::CalpontSystemCatalog::VARCHAR ||
r.colDataType == execplan::CalpontSystemCatalog::TEXT))
{
#if 0
// Currently, STRINT isn't properly implemented everywhere
// For short strings, we can get a faster execution for charset that fit in one byte.
if ( ( (l.colDataType == execplan::CalpontSystemCatalog::CHAR && l.colWidth <= 8) ||
(l.colDataType == execplan::CalpontSystemCatalog::VARCHAR && l.colWidth < 8) ) &&
( (r.colDataType == execplan::CalpontSystemCatalog::CHAR && r.colWidth <= 8) ||
@ -334,6 +338,7 @@ void PredicateOperator::setOpType(Type& l, Type& r)
}
}
else
#endif
{
fOperationType.colDataType = execplan::CalpontSystemCatalog::VARCHAR;
fOperationType.colWidth = 255;
@ -345,15 +350,48 @@ void PredicateOperator::setOpType(Type& l, Type& r)
fOperationType.colDataType = execplan::CalpontSystemCatalog::LONGDOUBLE;
fOperationType.colWidth = sizeof(long double);
}
/*
else
{
fOperationType.colDataType = execplan::CalpontSystemCatalog::DOUBLE;
fOperationType.colWidth = 8;
}
*/
cs = get_charset(fOperationType.charsetNumber, MYF(MY_WME));
}
inline bool PredicateOperator::strTrimCompare(const std::string& op1, const std::string& op2)
{
int r1 = cs->strnncollsp(op1.c_str(), op1.length(), op2.c_str(), op2.length());
switch (fOp)
{
case OP_EQ:
return r1 == 0;
case OP_NE:
return r1 != 0;
case OP_GT:
return r1 > 0;
case OP_GE:
return r1 >= 0;
case OP_LT:
return r1 < 0;
case OP_LE:
return r1 <= 0;
default:
{
std::ostringstream oss;
oss << "Unsupported predicate operation: " << fOp;
throw logging::InvalidOperationExcept(oss.str());
}
}
}
bool PredicateOperator::getBoolVal(rowgroup::Row& row, bool& isNull, ReturnedColumn* lop, ReturnedColumn* rop)
{
// like operator. both sides are string.
@ -730,12 +768,8 @@ bool PredicateOperator::getBoolVal(rowgroup::Row& row, bool& isNull, ReturnedCol
const std::string& val1 = lop->getStrVal(row, isNull);
if (isNull)
return false;
const std::string& val2 = rop->getStrVal(row, isNull);
cs->strnncollsp(val1.c_str(), val1.length(), val2.c_str(), val2.length());
// return strTrimCompare(val1, rop->getStrVal(row, isNull), fOperationType.charsetNumber) && !isNull;
// return strCompare(val1, rop->getStrVal(row, isNull)) && !isNull;
return strTrimCompare(val1, rop->getStrVal(row, isNull)) && !isNull;
}
//FIXME: ???

View File

@ -121,6 +121,7 @@ public:
private:
template <typename result_t>
inline bool numericCompare(result_t op1, result_t op2);
inline bool strTrimCompare(const std::string& op1, const std::string& op2);
const CHARSET_INFO* cs;
};

View File

@ -413,8 +413,8 @@ void BatchPrimitiveProcessorJL::addElementType(const StringElementType& et, uint
void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in,
vector<ElementType>* out, bool* validCPData, uint64_t* lbid, int64_t* min,
int64_t* max, uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks,
uint16_t* preJoinRidCount) const
int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks) const
{
uint32_t i;
uint16_t l_count;
@ -425,11 +425,6 @@ void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in,
uint64_t tmp64;
uint8_t tmp8;
/* PM join support */
uint32_t jCount;
ElementType* jet;
// cout << "get Element Types uniqueID=" << uniqueID << endl;
/* skip the header */
idbassert(in.length() > sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
in.advance(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
@ -472,21 +467,6 @@ void BatchPrimitiveProcessorJL::getElementTypes(ByteStream& in,
(*out)[i].second = vals[i];
}
if (joiner.get() != NULL)
{
in >> *preJoinRidCount;
in >> jCount;
idbassert(in.length() > (jCount << 4));
jet = (ElementType*) in.buf();
for (i = 0; i < jCount; ++i)
out->push_back(jet[i]);
in.advance(jCount << 4);
}
else
*preJoinRidCount = l_count;
in >> *cachedIO;
in >> *physIO;
in >> *touchedBlocks;
@ -987,7 +967,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
if (needRidsAtDelivery)
flags |= SEND_RIDS_AT_DELIVERY;
if (joiner.get() != NULL || tJoiners.size() > 0)
if (tJoiners.size() > 0)
flags |= HAS_JOINER;
if (sendRowGroups)
@ -1090,11 +1070,6 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
// cout << "joined RG: " << joinedRG.toString() << endl;
}
}
else
{
bs << (uint8_t) joiner->includeAll();
bs << (uint32_t) joiner->size();
}
}
bs << filterCount;
@ -1581,51 +1556,6 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
return true;
}
void BatchPrimitiveProcessorJL::useJoiner(boost::shared_ptr<joiner::Joiner> j)
{
pos = 0;
joiner = j;
}
bool BatchPrimitiveProcessorJL::nextJoinerMsg(ByteStream& bs)
{
uint32_t size, toSend;
ISMPacketHeader ism;
memset((void*)&ism, 0, sizeof(ism));
if (smallSide.get() == NULL)
smallSide = joiner->getSmallSide();
size = smallSide->size();
if (pos == size)
{
/* last message */
ism.Command = BATCH_PRIMITIVE_END_JOINER;
bs.load((uint8_t*) &ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
pos = 0;
return false;
}
ism.Command = BATCH_PRIMITIVE_ADD_JOINER;
bs.load((uint8_t*) &ism, sizeof(ism));
bs << (messageqcpp::ByteStream::quadbyte)sessionID;
bs << (messageqcpp::ByteStream::quadbyte)stepID;
bs << uniqueID;
toSend = (size - pos > 1000000 ? 1000000 : size - pos);
bs << toSend;
bs << pos;
bs.append((uint8_t*) (&(*smallSide)[pos]), sizeof(ElementType) * toSend);
pos += toSend;
return true;
}
void BatchPrimitiveProcessorJL::setProjectionRowGroup(const rowgroup::RowGroup& rg)
{
ot = ROW_GROUP;
@ -1761,8 +1691,6 @@ void BatchPrimitiveProcessorJL::deliverStringTableRowGroup(bool b)
aggregateRGPM.setUseStringTable(b);
else if (fe2)
fe2Output.setUseStringTable(b);
// else if ((joiner.get() != NULL || tJoiners.size() > 0) && sendTupleJoinRowGroupData)
// joinedRG.setUseStringTable(b);
else
projectionRG.setUseStringTable(b);
}

View File

@ -126,9 +126,6 @@ public:
void createBPP(messageqcpp::ByteStream&) const;
void destroyBPP(messageqcpp::ByteStream&) const;
void useJoiner(boost::shared_ptr<joiner::Joiner>);
bool nextJoinerMsg(messageqcpp::ByteStream&);
/* Call this one last */
// void addDeliveryStep(const DeliveryStep &);
@ -154,8 +151,7 @@ public:
/* Turn a ByteStream into ElementTypes or StringElementTypes */
void getElementTypes(messageqcpp::ByteStream& in, std::vector<ElementType>* out,
bool* validCPData, uint64_t* lbid, int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
uint32_t* touchedBlocks,
uint16_t* preJoinRidCount) const;
uint32_t* touchedBlocks) const;
void getStringElementTypes(messageqcpp::ByteStream& in,
std::vector<StringElementType>* out, bool* validCPData, uint64_t* lbid,
int64_t* min, int64_t* max, uint32_t* cachedIO, uint32_t* physIO,
@ -314,7 +310,6 @@ private:
/* for Joiner serialization */
bool pickNextJoinerNum();
uint32_t pos, joinerNum;
boost::shared_ptr<joiner::Joiner> joiner;
boost::shared_ptr<std::vector<ElementType> > smallSide;
boost::scoped_array<uint32_t> posByJoinerNum;

View File

@ -134,7 +134,6 @@ public:
{
return fAlias;
}
void useJoiner(boost::shared_ptr<joiner::Joiner>) {}
void setJobInfo(const JobInfo* jobInfo) {}
void setOutputRowGroup(const rowgroup::RowGroup&);
const rowgroup::RowGroup& getOutputRowGroup() const;

View File

@ -925,7 +925,7 @@ public:
{
return fOutType;
}
void getOutputType(BPSOutputType ot)
void setOutputType(BPSOutputType ot)
{
fOutType = ot;
}
@ -1062,7 +1062,6 @@ public:
virtual bool wasStepRun() const = 0;
virtual BPSOutputType getOutputType() const = 0;
virtual uint64_t getRows() const = 0;
virtual void useJoiner(boost::shared_ptr<joiner::Joiner>) = 0;
virtual void setJobInfo(const JobInfo* jobInfo) = 0;
virtual void setOutputRowGroup(const rowgroup::RowGroup& rg) = 0;
virtual const rowgroup::RowGroup& getOutputRowGroup() const = 0;
@ -1244,7 +1243,6 @@ public:
{
return uniqueID;
}
void useJoiner(boost::shared_ptr<joiner::Joiner>);
void useJoiner(boost::shared_ptr<joiner::TupleJoiner>);
void useJoiners(const std::vector<boost::shared_ptr<joiner::TupleJoiner> >&);
bool wasStepRun() const

View File

@ -2774,10 +2774,6 @@ void TupleBPS::useJoiners(const vector<boost::shared_ptr<joiner::TupleJoiner> >&
fBPP->useJoiners(tjoiners);
}
void TupleBPS::useJoiner(boost::shared_ptr<joiner::Joiner> j)
{
}
void TupleBPS::newPMOnline(uint32_t connectionNumber)
{
ByteStream bs;

View File

@ -3412,7 +3412,8 @@ ReturnedColumn* buildReturnedColumn(
if (rc && item->name.length)
rc->alias(item->name.str);
rc->charsetNumber(item->collation.collation->number);
if (rc)
rc->charsetNumber(item->collation.collation->number);
return rc;
}

View File

@ -393,15 +393,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
// cout << "got the joined Rowgroup: " << joinedRG.toString() << "\n";
}
}
else
{
bs >> tmp8;
bs >> joinerSize;
joiner.reset(new Joiner((bool) tmp8));
// going to use just one lock for this old style, probably not used, join
addToJoinerLocks.reset(new boost::scoped_array<boost::mutex>[1]);
addToJoinerLocks[0].reset(new boost::mutex[1]);
}
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock);
@ -786,19 +777,6 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
*/
}
}
else
{
joblist::ElementType *et = (joblist::ElementType*) bs.buf();
boost::mutex::scoped_lock lk(addToJoinerLocks[0][0]);
for (i = 0; i < count; i++)
{
// cout << "BPP: adding <" << et[i].first << ", " << et[i].second << "> to Joiner\n";
joiner->insert(et[i]);
}
bs.advance(count << 4);
}
idbassert(bs.length() == 0);
}
@ -838,38 +816,35 @@ int BatchPrimitiveProcessor::endOfJoiner()
return 0;
}
if (ot == ROW_GROUP)
for (i = 0; i < joinerCount; i++)
for (i = 0; i < joinerCount; i++)
{
if (!typelessJoin[i])
{
if (!typelessJoin[i])
{
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tJoiners[i] || !tJoiners[i][j])
return -1;
else
currentSize += tJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tJoiners[i] || !tJoiners[i][j])
return -1;
//if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
else
{
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tlJoiners[i] || !tlJoiners[i][j])
return -1;
else
currentSize += tlJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
return -1;
//if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
else
currentSize += tJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
return -1;
//if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
else if (joiner.get() == NULL || joiner->size() != joinerSize)
return -1;
else
{
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tlJoiners[i] || !tlJoiners[i][j])
return -1;
else
currentSize += tlJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
return -1;
//if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
}
endOfJoinerRan = true;
@ -1115,26 +1090,6 @@ void BatchPrimitiveProcessor::initProcessor()
asyncLoaded.reset(new bool[projectCount + 1]);
}
void BatchPrimitiveProcessor::executeJoin()
{
uint32_t newRowCount, i;
preJoinRidCount = ridCount;
newRowCount = 0;
smallSideMatches.clear();
for (i = 0; i < ridCount; i++)
{
if (joiner->getNewMatches(values[i], &smallSideMatches))
{
values[newRowCount] = values[i];
relRids[newRowCount++] = relRids[i];
}
}
ridCount = newRowCount;
}
/* This version does a join on projected rows */
void BatchPrimitiveProcessor::executeTupleJoin()
{
@ -1143,7 +1098,6 @@ void BatchPrimitiveProcessor::executeTupleJoin()
uint64_t largeKey;
TypelessData tlLargeKey;
preJoinRidCount = ridCount;
outputRG.getRow(0, &oldRow);
outputRG.getRow(0, &newRow);
@ -1513,17 +1467,6 @@ void BatchPrimitiveProcessor::execute()
stopwatch->start("BatchPrimitiveProcessor::execute third part");
#endif
if (doJoin && ot != ROW_GROUP)
{
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("- executeJoin");
executeJoin();
stopwatch->stop("- executeJoin");
#else
executeJoin();
#endif
}
if (projectCount > 0 || ot == ROW_GROUP)
{
#ifdef PRIMPROC_STOPWATCH
@ -2058,17 +2001,6 @@ void BatchPrimitiveProcessor::serializeElementTypes()
*serialized << ridCount;
serialized->append((uint8_t*) relRids, ridCount << 1);
serialized->append((uint8_t*) values, ridCount << 3);
/* Send the small side matches if there was a join */
if (doJoin)
{
uint32_t ssize = smallSideMatches.size();
*serialized << preJoinRidCount;
*serialized << (uint32_t) ssize;
if (ssize > 0)
serialized->append((uint8_t*) &smallSideMatches[0], ssize << 4);
}
}
void BatchPrimitiveProcessor::serializeStrings()
@ -2403,51 +2335,44 @@ SBPP BatchPrimitiveProcessor::duplicate()
if (doJoin)
{
pthread_mutex_lock(&bpp->objLock);
bpp->joinerSize = joinerSize;
/* There are add'l join vars, but only these are necessary for processing
a join */
bpp->tJoinerSizes = tJoinerSizes;
bpp->joinerCount = joinerCount;
bpp->joinTypes = joinTypes;
bpp->largeSideKeyColumns = largeSideKeyColumns;
bpp->tJoiners = tJoiners;
//bpp->_pools = _pools;
bpp->typelessJoin = typelessJoin;
bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
bpp->tlJoiners = tlJoiners;
bpp->tlKeyLengths = tlKeyLengths;
bpp->storedKeyAllocators = storedKeyAllocators;
bpp->joinNullValues = joinNullValues;
bpp->doMatchNulls = doMatchNulls;
bpp->hasJoinFEFilters = hasJoinFEFilters;
bpp->hasSmallOuterJoin = hasSmallOuterJoin;
if (ot == ROW_GROUP)
if (hasJoinFEFilters)
{
/* There are add'l join vars, but only these are necessary for processing
a join */
bpp->tJoinerSizes = tJoinerSizes;
bpp->joinerCount = joinerCount;
bpp->joinTypes = joinTypes;
bpp->largeSideKeyColumns = largeSideKeyColumns;
bpp->tJoiners = tJoiners;
//bpp->_pools = _pools;
bpp->typelessJoin = typelessJoin;
bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
bpp->tlJoiners = tlJoiners;
bpp->tlKeyLengths = tlKeyLengths;
bpp->storedKeyAllocators = storedKeyAllocators;
bpp->joinNullValues = joinNullValues;
bpp->doMatchNulls = doMatchNulls;
bpp->hasJoinFEFilters = hasJoinFEFilters;
bpp->hasSmallOuterJoin = hasSmallOuterJoin;
bpp->joinFERG = joinFERG;
bpp->joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
if (hasJoinFEFilters)
{
bpp->joinFERG = joinFERG;
bpp->joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
for (i = 0; i < joinerCount; i++)
if (joinFEFilters[i])
bpp->joinFEFilters[i].reset(new FuncExpWrapper(*joinFEFilters[i]));
}
if (getTupleJoinRowGroupData)
{
bpp->smallSideRGs = smallSideRGs;
bpp->largeSideRG = largeSideRG;
bpp->smallSideRowLengths = smallSideRowLengths;
bpp->smallSideRowData = smallSideRowData;
bpp->smallNullRowData = smallNullRowData;
bpp->smallNullPointers = smallNullPointers;
bpp->joinedRG = joinedRG;
}
for (i = 0; i < joinerCount; i++)
if (joinFEFilters[i])
bpp->joinFEFilters[i].reset(new FuncExpWrapper(*joinFEFilters[i]));
}
if (getTupleJoinRowGroupData)
{
bpp->smallSideRGs = smallSideRGs;
bpp->largeSideRG = largeSideRG;
bpp->smallSideRowLengths = smallSideRowLengths;
bpp->smallSideRowData = smallSideRowData;
bpp->smallNullRowData = smallNullRowData;
bpp->smallNullPointers = smallNullPointers;
bpp->joinedRG = joinedRG;
}
else
bpp->joiner = joiner;
#ifdef __FreeBSD__
pthread_mutex_unlock(&bpp->objLock);
@ -2549,10 +2474,6 @@ bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) con
if (*filterSteps[i] != *bpp.filterSteps[i])
return false;
for (i = 0; i < projectCount; i++)
if (*projectSteps[i] != *bpp.projectSteps[i])
return false;
return true;
}
#endif

View File

@ -251,14 +251,9 @@ private:
bool fBusy;
/* Join support TODO: Make join ops a seperate Command class. */
boost::shared_ptr<joiner::Joiner> joiner;
std::vector<joblist::ElementType> smallSideMatches;
bool doJoin;
uint32_t joinerSize;
uint16_t preJoinRidCount;
boost::scoped_array<boost::scoped_array<boost::mutex> > addToJoinerLocks;
boost::scoped_array<boost::mutex> smallSideDataLocks;
void executeJoin();
// uint32_t ridsIn, ridsOut;

View File

@ -696,7 +696,12 @@ int main(int argc, char* argv[])
}
BPPCount = highPriorityThreads + medPriorityThreads + lowPriorityThreads;
// For debug
lowPriorityThreads = 1;
medPriorityThreads = 1;
highPriorityThreads = 1;
BPPCount=1;
// let the user override if they want
temp = toInt(cf->getConfig(primitiveServers, "BPPCount"));

View File

@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES} )
########### next target ###############
set(joiner_LIB_SRCS joiner.cpp tuplejoiner.cpp joinpartition.cpp)
set(joiner_LIB_SRCS tuplejoiner.cpp joinpartition.cpp)
add_library(joiner SHARED ${joiner_LIB_SRCS})

View File

@ -390,7 +390,12 @@ inline void RowAggregation::updateFloatMinMax(float val1, float val2, int64_t co
void RowAggregation::updateStringMinMax(string val1, string val2, int64_t col, int func)
{
CHARSET_INFO* cs = fRowGroupIn.getCharset(col);
if (isNull(fRowGroupOut, fRow, col))
{
fRow.setStringField(val1, col);
return;
}
CHARSET_INFO* cs = fRow.getCharset(col);
int tmp = cs->strnncoll(val1.c_str(), val1.length(), val2.c_str(), val2.length());
if ((tmp < 0 && func == rowgroup::ROWAGG_MIN) ||
@ -1276,19 +1281,9 @@ void RowAggregation::doMinMax(const Row& rowIn, int64_t colIn, int64_t colOut, i
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
{
int colWidth = fRowGroupIn.getColumnWidth(colIn);
if (colWidth <= 8)
{
uint64_t valIn = rowIn.getUintField(colIn);
uint64_t valOut = fRow.getUintField(colOut);
updateCharMinMax(valIn, valOut, colOut, funcType);
}
else
{
string valIn = rowIn.getStringField(colIn);
string valOut = fRow.getStringField(colOut);
updateStringMinMax(valIn, valOut, colOut, funcType);
}
string valIn = rowIn.getStringField(colIn);
string valOut = fRow.getStringField(colOut);
updateStringMinMax(valIn, valOut, colOut, funcType);
break;
}

View File

@ -1042,14 +1042,10 @@ bool Row::equals(const std::string& val, uint32_t col) const
if (memcmp(getStringPointer(col), val.c_str(), val.length()))
return false;
}
else if (inStringTable(col))
{
uint64_t offset = *((uint64_t*) &data[offsets[col]]);
return strings->equals(val, offset, cs);
}
else
{
return (cs->strnncollsp(val.c_str(), val.length(), (char*)&data[offsets[col]], getColumnWidth(col)) == 0);
return (cs->strnncollsp((char*)getStringPointer(col), getStringLength(col),
val.c_str(), val.length()) == 0);
}
return true;
}