1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-4498 LIKE is not collation aware

This commit is contained in:
Alexander Barkov
2021-02-26 16:16:00 +04:00
parent 5943261bfc
commit 765858bc5b
61 changed files with 1094 additions and 789 deletions

View File

@ -49,49 +49,27 @@ const char* signatureNotFound = joblist::CPSTRNOTFOUND.c_str();
namespace primitives
{
inline bool PrimitiveProcessor::compare(int cmp, uint8_t COP, int len1, int len2) throw()
inline bool PrimitiveProcessor::compare(const datatypes::Charset &cs, uint8_t COP,
const char *str1, size_t length1,
const char *str2, size_t length2) throw()
{
switch (COP)
int error = 0;
bool rc = primitives::StringComparator(cs).op(&error, COP,
ConstString(str1, length1),
ConstString(str2, length2));
if (error)
{
case COMPARE_NIL:
return false;
MessageLog logger(LoggingID(28));
logging::Message::Args colWidth;
Message msg(34);
case COMPARE_LT:
return cmp < 0;
case COMPARE_EQ:
return cmp == 0;
case COMPARE_LE:
return cmp <= 0;
case COMPARE_GT:
return cmp > 0;
case COMPARE_NE:
return cmp != 0;
case COMPARE_GE:
return cmp >= 0;
case COMPARE_LIKE:
return cmp; // is done elsewhere; shouldn't get here. Exception?
case COMPARE_NOT:
return false; // throw an exception here?
default:
MessageLog logger(LoggingID(28));
logging::Message::Args colWidth;
Message msg(34);
colWidth.add(COP);
colWidth.add("compare");
msg.format(colWidth);
logger.logErrorMessage(msg);
return false; // throw an exception here?
colWidth.add(COP);
colWidth.add("compare");
msg.format(colWidth);
logger.logErrorMessage(msg);
return false; // throw an exception here?
}
return rc;
}
/*
@ -109,7 +87,7 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
const uint16_t* offsets;
int offsetIndex, argIndex, argsOffset;
bool cmpResult = false;
int tmp, i, err;
int i;
const char* sig;
uint16_t siglen;
@ -118,8 +96,6 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
int rdvOffset;
uint8_t* niceRet; // ret cast to a byte-indexed type
boost::scoped_array<idb_regex_t> regex;
// set up pointers to fields within each structure
// either retTokens or retDataValues will be used but not both.
@ -144,36 +120,8 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
niceBlock = reinterpret_cast<const uint8_t*>(block);
offsets = reinterpret_cast<const uint16_t*>(&niceBlock[10]);
niceInput = reinterpret_cast<const uint8_t*>(h);
const CHARSET_INFO* cs = & datatypes::Charset(h->charsetNumber).getCharset();
// if LIKE is an operator, compile regexp's in advance.
if ((h->NVALS > 0 && h->COP1 & COMPARE_LIKE) ||
(h->NVALS == 2 && h->COP2 & COMPARE_LIKE))
{
regex.reset(new idb_regex_t[h->NVALS]);
for (i = 0, argsOffset = sizeof(TokenByScanRequestHeader); i < h->NVALS; i++)
{
p_DataValue pdvTmp;
args = reinterpret_cast<const DataValue*>(&niceInput[argsOffset]);
pdvTmp.len = args->len;
pdvTmp.data = (const uint8_t*) args->data;
err = convertToRegexp(&regex[i], &pdvTmp);
if (err != 0)
{
MessageLog logger(LoggingID(28));
Message msg(37);
logger.logErrorMessage(msg);
return;
}
argsOffset += sizeof(uint16_t) + args->len;
}
}
const datatypes::Charset cs(h->charsetNumber);
for (offsetIndex = 1; offsets[offsetIndex] != 0xffff; offsetIndex++)
{
@ -186,7 +134,7 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
if (eqFilter)
{
if (cs != & eqFilter->getCharset())
if (& cs.getCharset() != & eqFilter->getCharset())
{
//throw runtime_error("Collations mismatch: TokenByScanRequestHeader and DicEqualityFilter");
}
@ -200,22 +148,7 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
goto no_store;
}
if (h->COP1 & COMPARE_LIKE)
{
p_DataValue dv;
dv.len = siglen;
dv.data = (uint8_t*) sig;
cmpResult = isLike(&dv, &regex[argIndex]);
if (h->COP1 & COMPARE_NOT)
cmpResult = !cmpResult;
}
else
{
tmp = cs->strnncollsp(sig, siglen, args->data, args->len);
cmpResult = compare(tmp, h->COP1, siglen, args->len);
}
cmpResult = compare(cs, h->COP1, sig, siglen, args->data, args->len);
switch (h->NVALS)
{
@ -240,23 +173,7 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
argIndex++;
args = (DataValue*) &niceInput[argsOffset];
if (h->COP2 & COMPARE_LIKE)
{
p_DataValue dv;
dv.len = siglen;
dv.data = (uint8_t*) sig;
cmpResult = isLike(&dv, &regex[argIndex]);
if (h->COP2 & COMPARE_NOT)
cmpResult = !cmpResult;
}
else
{
tmp = cs->strnncollsp(sig, siglen, args->data, args->len);
cmpResult = compare(tmp, h->COP2, siglen, args->len);
}
cmpResult = compare(cs, h->COP2, sig, siglen, args->data, args->len);
if (cmpResult)
goto store;
@ -266,25 +183,10 @@ void PrimitiveProcessor::p_TokenByScan(const TokenByScanRequestHeader* h,
default:
{
idbassert(0);
for (i = 0, cmpResult = true; i < h->NVALS; i++)
{
if (h->COP1 & COMPARE_LIKE)
{
p_DataValue dv;
dv.len = siglen;
dv.data = (uint8_t*) sig;
cmpResult = isLike(&dv, &regex[argIndex]);
if (h->COP1 & COMPARE_NOT)
cmpResult = !cmpResult;
}
else
{
tmp = cs->strnncollsp(sig, siglen, args->data, args->len);
cmpResult = compare(tmp, h->COP2, siglen, args->len);
}
cmpResult = compare(cs, h->COP1, sig, siglen, args->data, args->len);
if (!cmpResult && h->BOP == BOP_AND)
goto no_store;
@ -507,144 +409,6 @@ again:
}
const char backslash = '\\';
inline bool PrimitiveProcessor::isEscapedChar(char c)
{
return ('%' == c || '_' == c);
}
//FIXME: copy/pasted to dataconvert.h: refactor
int PrimitiveProcessor::convertToRegexp(idb_regex_t* regex, const p_DataValue* str)
{
//In the worst case, every char is quadrupled, plus some leading/trailing cruft...
char* cBuf = new char[(4 * str->len) + 3];
char c;
int i, cBufIdx = 0;
// translate to regexp symbols
cBuf[cBufIdx++] = '^'; // implicit leading anchor
for (i = 0; i < str->len; i++)
{
c = (char) str->data[i];
switch (c)
{
// chars to substitute
case '%':
cBuf[cBufIdx++] = '.';
cBuf[cBufIdx++] = '*';
break;
case '_':
cBuf[cBufIdx++] = '.';
break;
// escape the chars that are special in regexp's but not in SQL
// default special characters in perl: .[{}()\*+?|^$
case '.':
case '*':
case '^':
case '$':
case '?':
case '+':
case '|':
case '[':
case ']':
case '{':
case '}':
case '(':
case ')':
cBuf[cBufIdx++] = backslash;
cBuf[cBufIdx++] = c;
break;
case backslash: //this is the sql escape char
if ( i + 1 < str->len)
{
if (isEscapedChar(str->data[i + 1]))
{
cBuf[cBufIdx++] = str->data[++i];
break;
}
else if (backslash == str->data[i + 1])
{
cBuf[cBufIdx++] = c;
cBuf[cBufIdx++] = str->data[++i];
break;
}
} //single slash
cBuf[cBufIdx++] = backslash;
cBuf[cBufIdx++] = c;
break;
default:
cBuf[cBufIdx++] = c;
}
}
cBuf[cBufIdx++] = '$'; // implicit trailing anchor
cBuf[cBufIdx++] = '\0';
#ifdef VERBOSE
cerr << "regexified string is " << cBuf << endl;
#endif
#ifdef POSIX_REGEX
regcomp(&regex->regex, cBuf, REG_NOSUB | REG_EXTENDED);
#else
regex->regex = cBuf;
#endif
regex->used = true;
delete [] cBuf;
return 0;
}
bool PrimitiveProcessor::isLike(const p_DataValue* dict, const idb_regex_t* regex) throw()
{
#ifdef POSIX_REGEX
char* cBuf = new char[dict->len + 1];
memcpy(cBuf, dict->data, dict->len);
cBuf[dict->len] = '\0';
bool ret = (regexec(&regex->regex, cBuf, 0, NULL, 0) == 0);
delete [] cBuf;
return ret;
#else
/* Note, the passed-in pointers are effectively begin() and end() iterators */
return regex_match(dict->data, dict->data + dict->len, regex->regex);
#endif
}
boost::shared_array<idb_regex_t>
PrimitiveProcessor::makeLikeFilter (const DictFilterElement* filterString, uint32_t count)
{
boost::shared_array<idb_regex_t> ret;
uint32_t filterIndex, filterOffset;
uint8_t* in8 = (uint8_t*) filterString;
const DictFilterElement* filter;
p_DataValue filterptr = {0, NULL};
for (filterIndex = 0, filterOffset = 0; filterIndex < count; filterIndex++)
{
filter = reinterpret_cast<const DictFilterElement*>(&in8[filterOffset]);
if (filter->COP & COMPARE_LIKE)
{
if (!ret)
ret.reset(new idb_regex_t[count]);
filterptr.len = filter->len;
filterptr.data = filter->data;
convertToRegexp(&ret[filterIndex], &filterptr);
}
filterOffset += sizeof(DictFilterElement) + filter->len;
}
return ret;
}
void PrimitiveProcessor::p_Dictionary(const DictInput* in,
vector<uint8_t>* out,
@ -662,7 +426,7 @@ void PrimitiveProcessor::p_Dictionary(const DictInput* in,
uint16_t aggCount;
bool cmpResult;
DictOutput header;
const CHARSET_INFO* cs = & datatypes::Charset(charsetNumber).getCharset();
const datatypes::Charset &cs(charsetNumber);
// default size of the ouput to something sufficiently large to prevent
// excessive reallocation and copy when resizing
@ -704,7 +468,7 @@ void PrimitiveProcessor::p_Dictionary(const DictInput* in,
// len == 0 indicates this is the first pass
if (max.len != 0)
{
tmp = cs->strnncollsp(sigptr.data, sigptr.len, max.data, max.len);
tmp = cs.strnncollsp(sigptr.data, sigptr.len, max.data, max.len);
if (tmp > 0)
max = sigptr;
@ -714,7 +478,7 @@ void PrimitiveProcessor::p_Dictionary(const DictInput* in,
if (min.len != 0)
{
tmp = cs->strnncollsp(sigptr.data, sigptr.len, min.data, min.len);
tmp = cs.strnncollsp(sigptr.data, sigptr.len, min.data, min.len);
if (tmp < 0)
min = sigptr;
@ -748,18 +512,9 @@ void PrimitiveProcessor::p_Dictionary(const DictInput* in,
{
filter = reinterpret_cast<const DictFilterElement*>(&in8[filterOffset]);
if (filter->COP & COMPARE_LIKE)
{
cmpResult = isLike(&sigptr, &parsedLikeFilter[filterIndex]);
if (filter->COP & COMPARE_NOT)
cmpResult = !cmpResult;
}
else
{
tmp = cs->strnncollsp(sigptr.data, sigptr.len, filter->data, filter->len);
cmpResult = compare(tmp, filter->COP, sigptr.len, filter->len);
}
cmpResult = compare(cs, filter->COP,
(const char *) sigptr.data, sigptr.len,
(const char *) filter->data, filter->len);
if (!cmpResult && in->BOP != BOP_OR)
goto no_store;