1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

A cleanup for MCOL-4064 Make JOIN collation aware

A non-JOIN condition like `WHERE c1=c2` (with c1 and c2 being columns of the
same table) was not collation-aware yet after the main patches for MCOL-4064.

Additionally fixing StrFilterCmd::compare*() to address this.
This commit is contained in:
Alexander Barkov
2020-12-08 13:10:51 +04:00
parent 23df62e337
commit b08d719593
4 changed files with 112 additions and 126 deletions

45
datatypes/mcs_string.h Normal file
View File

@ -0,0 +1,45 @@
/* Copyright (C) 2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef MCS_DATATYPES_STRING_H
#define MCS_DATATYPES_STRING_H
#include "conststring.h"
namespace datatypes
{
class TCharShort
{
int64_t mValue;
public:
TCharShort(int64_t value)
:mValue(value)
{ }
explicit operator utils::ConstString() const
{
utils::ConstString res = utils::ConstString((const char *) &mValue, 8);
return res.rtrimZero();
}
};
} // namespace datatypes
#endif // MCS_DATATYPES_STRING_H

View File

@ -28,6 +28,7 @@
#include "filtercommand.h"
#include "dataconvert.h"
#include "mcs_decimal.h"
#include "mcs_string.h"
using namespace std;
using namespace messageqcpp;
@ -566,42 +567,60 @@ void StrFilterCmd::setCompareFunc(uint32_t columns)
}
// TODO:
// Move this function as a method to class Charset
// and reuse it in:
// - colCompareStr() in primitives/linux-port/column.cpp
// - compareStr() in dbcon/joblist/lbidlist.cpp
//
// Note, the COMPARE_XXX constant should be put into
// a globally visible enum first, e.g. utils/common/mcs_basic_types.h
// and all "fBOP" members in all classes should be changed to this enum.
static inline bool compareString(const datatypes::Charset &cs,
const utils::ConstString &s0,
const utils::ConstString &s1,
uint8_t fBOP)
{
int cmp = cs.strnncollsp(s0, s1);
switch (fBOP)
{
case COMPARE_GT:
return cmp > 0;
case COMPARE_LT:
return cmp < 0;
case COMPARE_EQ:
return cmp == 0;
case COMPARE_GE:
return cmp >= 0;
case COMPARE_LE:
return cmp <= 0;
case COMPARE_NE:
return cmp != 0;
default:
break;
}
return false;
}
bool StrFilterCmd::compare_cc(uint64_t i, uint64_t j)
{
if (execplan::isNull(bpp->fFiltCmdValues[0][i], leftColType) ||
execplan::isNull(bpp->fFiltCmdValues[1][j], rightColType))
return false;
switch (fBOP)
{
case COMPARE_GT:
return uint64ToStr(bpp->fFiltCmdValues[0][i]) > uint64ToStr(bpp->fFiltCmdValues[1][j]);
break;
case COMPARE_LT:
return uint64ToStr(bpp->fFiltCmdValues[0][i]) < uint64ToStr(bpp->fFiltCmdValues[1][j]);
break;
case COMPARE_EQ:
return uint64ToStr(bpp->fFiltCmdValues[0][i]) == uint64ToStr(bpp->fFiltCmdValues[1][j]);
break;
case COMPARE_GE:
return uint64ToStr(bpp->fFiltCmdValues[0][i]) >= uint64ToStr(bpp->fFiltCmdValues[1][j]);
break;
case COMPARE_LE:
return uint64ToStr(bpp->fFiltCmdValues[0][i]) <= uint64ToStr(bpp->fFiltCmdValues[1][j]);
break;
case COMPARE_NE:
return uint64ToStr(bpp->fFiltCmdValues[0][i]) != uint64ToStr(bpp->fFiltCmdValues[1][j]);
break;
default:
return false;
break;
}
datatypes::Charset cs(leftColType.getCharset());
datatypes::TCharShort s0(bpp->fFiltCmdValues[0][i]);
datatypes::TCharShort s1(bpp->fFiltCmdValues[1][j]);
return compareString(cs, static_cast<utils::ConstString>(s0),
static_cast<utils::ConstString>(s1), fBOP);
}
@ -611,36 +630,10 @@ bool StrFilterCmd::compare_ss(uint64_t i, uint64_t j)
bpp->fFiltStrValues[0][i] == joblist::CPNULLSTRMARK || bpp->fFiltStrValues[1][j] == joblist::CPNULLSTRMARK)
return false;
switch (fBOP)
{
case COMPARE_GT:
return bpp->fFiltStrValues[0][i] > bpp->fFiltStrValues[1][j];
break;
case COMPARE_LT:
return bpp->fFiltStrValues[0][i] < bpp->fFiltStrValues[1][j];
break;
case COMPARE_EQ:
return bpp->fFiltStrValues[0][i] == bpp->fFiltStrValues[1][j];
break;
case COMPARE_GE:
return bpp->fFiltStrValues[0][i] >= bpp->fFiltStrValues[1][j];
break;
case COMPARE_LE:
return bpp->fFiltStrValues[0][i] <= bpp->fFiltStrValues[1][j];
break;
case COMPARE_NE:
return bpp->fFiltStrValues[0][i] != bpp->fFiltStrValues[1][j];
break;
default:
return false;
break;
}
datatypes::Charset cs(leftColType.getCharset());
utils::ConstString s0(utils::ConstString(bpp->fFiltStrValues[0][i]));
utils::ConstString s1(utils::ConstString(bpp->fFiltStrValues[1][j]));
return compareString(cs, s0, s1, fBOP);
}
@ -650,39 +643,10 @@ bool StrFilterCmd::compare_cs(uint64_t i, uint64_t j)
bpp->fFiltStrValues[1][j] == "" || bpp->fFiltStrValues[1][j] == joblist::CPNULLSTRMARK)
return false;
int cmp = strncmp(reinterpret_cast<const char*>(&bpp->fFiltCmdValues[0][i]),
bpp->fFiltStrValues[1][j].c_str(), fCharLength);
switch (fBOP)
{
case COMPARE_GT:
return (cmp > 0);
break;
case COMPARE_LT:
return (cmp < 0 || (cmp == 0 && fCharLength < bpp->fFiltStrValues[1][j].length()));
break;
case COMPARE_EQ:
return (cmp == 0 && fCharLength >= bpp->fFiltStrValues[1][j].length());
break;
case COMPARE_GE:
return (cmp > 0 || (cmp == 0 && fCharLength >= bpp->fFiltStrValues[1][j].length()));
break;
case COMPARE_LE:
return (cmp <= 0);
break;
case COMPARE_NE:
return (cmp != 0 || fCharLength < bpp->fFiltStrValues[1][j].length());
break;
default:
return false;
break;
}
datatypes::Charset cs(leftColType.getCharset());
datatypes::TCharShort s0(bpp->fFiltCmdValues[0][i]);
utils::ConstString s1(bpp->fFiltStrValues[1][j]);
return compareString(cs, static_cast<utils::ConstString>(s0), s1, fBOP);
}
@ -692,39 +656,10 @@ bool StrFilterCmd::compare_sc(uint64_t i, uint64_t j)
execplan::isNull(bpp->fFiltCmdValues[1][j], rightColType))
return false;
int cmp = strncmp(bpp->fFiltStrValues[0][i].c_str(),
reinterpret_cast<const char*>(&bpp->fFiltCmdValues[1][j]), fCharLength);
switch (fBOP)
{
case COMPARE_GT:
return (cmp > 0 || (cmp == 0 && bpp->fFiltStrValues[0][i].length() > fCharLength));
break;
case COMPARE_LT:
return (cmp < 0);
break;
case COMPARE_EQ:
return (cmp == 0 && bpp->fFiltStrValues[0][i].length() <= fCharLength);
break;
case COMPARE_GE:
return (cmp >= 0);
break;
case COMPARE_LE:
return (cmp < 0 || (cmp == 0 && bpp->fFiltStrValues[0][i].length() <= fCharLength));
break;
case COMPARE_NE:
return (cmp != 0 || bpp->fFiltStrValues[0][i].length() > fCharLength);
break;
default:
return false;
break;
}
datatypes::Charset cs(leftColType.getCharset());
utils::ConstString s0(bpp->fFiltStrValues[0][i]);
datatypes::TCharShort s1(bpp->fFiltCmdValues[1][j]);
return compareString(cs, s0, static_cast<utils::ConstString>(s1), fBOP);
}

View File

@ -117,6 +117,9 @@ protected:
const struct charset_info_st * mCharset;
public:
Charset(CHARSET_INFO & cs) :mCharset(&cs) { }
Charset(CHARSET_INFO *cs)
:mCharset(cs ? cs : &my_charset_bin)
{ }
Charset(uint32_t charsetNumber);
CHARSET_INFO & getCharset() const { return *mCharset; }
uint32_t hash(const char *data, uint64_t len) const

View File

@ -31,6 +31,9 @@ public:
ConstString(const char *str, size_t length)
:mStr(str), mLength(length)
{ }
explicit ConstString(const std::string &str)
:mStr(str.data()), mLength(str.length())
{ }
const char *str() const { return mStr; }
size_t length() const { return mLength; }
ConstString & rtrimZero()