1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Sergey Zefirov b53c231ca6 MCOL-271 empty strings should not be NULLs (#2794)
This patch improves handling of NULLs in textual fields in ColumnStore.
Previously empty strings were considered NULLs and it could be a problem
if data scheme allows for empty strings. It was also one of major
reasons of behavior difference between ColumnStore and other engines in
MariaDB family.

Also, this patch fixes some other bugs and incorrect behavior, for
example, incorrect comparison for "column <= ''" which evaluates to
constant True for all purposes before this patch.
2023-03-30 21:18:29 +03:00

713 lines
19 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: dictstep.cpp 2110 2013-06-19 15:51:38Z bwilkinson $
// C++ Implementation: dictstep
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <unistd.h>
#include <algorithm>
#include "bpp.h"
#include "primitiveserver.h"
#include "pp_logger.h"
#include "../linux-port/primitiveprocessor.h"
using namespace std;
using namespace messageqcpp;
using namespace rowgroup;
namespace primitiveprocessor
{
extern uint32_t dictBufferSize;
DictStep::DictStep() : Command(DICT_STEP), strValues(NULL), filterCount(0), bufferSize(0)
{
fMinMax[0] = MAX_UBIGINT;
fMinMax[1] = MIN_UBIGINT;
}
DictStep::~DictStep()
{
}
DictStep& DictStep::operator=(const DictStep& d)
{
// Not all fields are copied for the sake of efficiency.
// Right now, these only need to copy fields deserialized from the create msg.
BOP = d.BOP;
fFilterFeeder = d.fFilterFeeder;
compressionType = d.compressionType;
filterString = d.filterString;
hasEqFilter = d.hasEqFilter;
eqFilter = d.eqFilter;
eqOp = d.eqOp;
filterCount = d.filterCount;
charsetNumber = d.charsetNumber;
fMinMax[0] = d.fMinMax[0];
fMinMax[1] = d.fMinMax[1];
return *this;
}
void DictStep::createCommand(ByteStream& bs)
{
uint8_t tmp8;
bs.advance(1);
bs >> BOP;
bs >> tmp8;
compressionType = tmp8;
bs >> charsetNumber;
bs >> filterCount;
bs >> tmp8;
hasEqFilter = tmp8;
if (hasEqFilter)
{
string strTmp;
datatypes::Charset cs(charsetNumber);
eqFilter.reset(new primitives::DictEqualityFilter(cs));
bs >> eqOp;
for (uint32_t i = 0; i < filterCount; i++)
{
bs >> strTmp;
eqFilter->insert(strTmp);
}
}
else
bs >> filterString;
Command::createCommand(bs);
}
void DictStep::resetCommand(ByteStream& bs)
{
}
void DictStep::prep(int8_t outputType, bool makeAbsRids)
{
// (at most there are 8192 tokens to fetch)
bufferSize = sizeof(DictInput) + filterString.length() + (8192 * sizeof(OldGetSigParams));
inputMsg.reset(new uint8_t[bufferSize]);
primMsg = (DictInput*)inputMsg.get();
primMsg->ism.Interleave = 0;
primMsg->ism.Flags = 0;
primMsg->ism.Command = DICT_SIGNATURE;
primMsg->ism.Size = bufferSize;
primMsg->ism.Type = 2;
primMsg->hdr.SessionID = bpp->sessionID;
// primMsg->hdr.StatementID = 0;
primMsg->hdr.TransactionID = bpp->txnID;
primMsg->hdr.VerID = bpp->versionInfo.currentScn;
primMsg->hdr.StepID = bpp->stepID;
primMsg->BOP = BOP;
primMsg->InputFlags = 1; // TODO: Use the new p_Dict functionality
primMsg->OutputType =
(eqFilter || filterCount || fFilterFeeder != NOT_FEEDER ? OT_RID : OT_RID | OT_DATAVALUE);
primMsg->NOPS = (eqFilter ? 0 : filterCount);
primMsg->NVALS = 0;
}
void DictStep::issuePrimitive(bool isFilter)
{
bool wasCached;
uint32_t blocksRead;
if (!(primMsg->LBID & 0x8000000000000000LL))
{
// std::cerr << "DS issuePrimitive lbid: " << (uint64_t)primMsg->LBID << endl;
primitiveprocessor::loadBlock(primMsg->LBID, bpp->versionInfo, bpp->txnID, compressionType,
bpp->blockData, &wasCached, &blocksRead, bpp->LBIDTrace, bpp->sessionID);
if (wasCached)
bpp->cachedIO++;
bpp->physIO += blocksRead;
bpp->touchedBlocks++;
}
#if !defined(XXX_PRIMITIVES_TOKEN_RANGES_XXX)
bpp->pp.p_Dictionary(primMsg, &result, isFilter, charsetNumber, eqFilter, eqOp);
#else
bpp->pp.p_Dictionary(primMsg, &result, isFilter, charsetNumber, eqFilter, eqOp, fMinMax);
#endif
}
void DictStep::copyResultToTmpSpace(OrderedToken* ot)
{
uint32_t i;
uint8_t* pos;
uint16_t len;
uint64_t rid64;
uint16_t rid16;
idbassert(primMsg->OutputType & OT_RID);
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
for (i = 0; i < header->NVALS; i++)
{
rid64 = *((uint64_t*)pos);
pos += 8;
rid16 = rid64 & 0x1fff;
ot[rid16].inResult = true;
tmpResultCounter++;
if (primMsg->OutputType & OT_DATAVALUE)
{
NullString ns;
uint8_t isnull = *pos;
pos += 1;
len = *((uint16_t*)pos);
pos += 2;
if (!isnull) {
ns.assign(pos, len);
}
pos += len;
ot[rid16].str = ns;
//if (rid64 & 0x8000000000000000LL)
// ot[rid16].str = joblist::CPNULLSTRMARK;
}
}
}
void DictStep::copyResultToFinalPosition(OrderedToken* ot)
{
uint32_t i, resultPos = 0;
for (i = 0; i < inputRidCount; i++)
{
if (ot[i].inResult)
{
bpp->absRids[resultPos] = ot[i].rid;
bpp->relRids[resultPos] = ot[i].rid - bpp->baseRid;
if (primMsg->OutputType & OT_DATAVALUE)
(*strValues)[resultPos] = ot[i].str;
resultPos++;
}
}
}
void DictStep::processResult()
{
uint32_t i;
uint8_t* pos;
uint16_t len;
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
for (i = 0; i < header->NVALS; i++, tmpResultCounter++)
{
if (primMsg->OutputType & OT_RID)
{
bpp->absRids[tmpResultCounter] = *((uint64_t*)pos);
pos += 8;
// bpp->relRids[tmpResultCounter] = bpp->absRids[tmpResultCounter] & 0x1fff;
bpp->relRids[tmpResultCounter] = bpp->absRids[tmpResultCounter] - bpp->baseRid;
}
if (primMsg->OutputType & OT_DATAVALUE)
{
uint8_t isnull = *pos;
pos += 1;
NullString ns;
len = *((uint16_t*)pos);
pos += 2;
if (!isnull)
{
ns.assign(pos, len);
}
pos += len;
(*strValues)[tmpResultCounter] = ns;
}
// cout << " stored " << (*strValues)[tmpResultCounter] << endl;
/* XXXPAT: disclaimer: this is how we do it in DictionaryStep; don't know
if it's necessary or not yet */
bpp->absRids[tmpResultCounter] &= 0x7FFFFFFFFFFFFFFFLL;
}
}
void DictStep::projectResult(string* strings)
{
uint32_t i;
uint8_t* pos;
uint16_t len;
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
// cout << "projectResult() l: " << primMsg->LBID << " NVALS: " << header->NVALS << endl;
for (i = 0; i < header->NVALS; i++)
{
len = *((uint16_t*)pos);
pos += 2;
strings[tmpResultCounter++] = string((char*)pos, len);
// cout << "serialized length is " << len << " string is " << strings[tmpResultCounter-1] << " string
// length = " <<
// strings[tmpResultCounter-1].length() << endl;
pos += len;
totalResultLength += len + 4;
}
}
// bug4901 -
// This version of projectResult needs to stay in sync with
// the above. They are separate methods because the
// _projectToRG() method can deal with this optimized version
// where we only need to return the pointer and length. This
// is desirable because it avoids an unnecessary temporary
// string copy. The above version is still needed for the
// _project() method where it has to serialize the totalResultLength
// before starting to serialize strings.
void DictStep::projectResult(StringPtr* strings)
{
uint32_t i;
uint8_t* pos;
uint16_t len;
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
// cout << "projectResult() l: " << primMsg->LBID << " NVALS: " << header->NVALS << endl;
for (i = 0; i < header->NVALS; i++)
{
uint8_t isnull = *pos;
pos += 1;
len = *((uint16_t*)pos);
pos += 2;
strings[tmpResultCounter++] = StringPtr(isnull ? nullptr : pos, len);
// cout << "serialized length is " << len << " string is " << strings[tmpResultCounter-1] << " string
// length = " << strings[tmpResultCounter-1].length() << endl;
pos += len;
totalResultLength += len + 4;
}
}
void DictStep::execute()
{
if (fFilterFeeder == LEFT_FEEDER)
strValues = &(bpp->fFiltStrValues[0]);
else if (fFilterFeeder == RIGHT_FEEDER)
strValues = &(bpp->fFiltStrValues[1]);
else
strValues = &(bpp->strValues);
_execute();
}
void DictStep::_execute()
{
/* Need to loop over bpp->values, issuing a primitive for each LBID */
uint64_t i;
int64_t l_lbid;
OldGetSigParams* pt;
boost::scoped_array<OrderedToken> newRidList;
// make the OrderedToken list
newRidList.reset(new OrderedToken[bpp->ridCount]);
for (i = 0; i < bpp->ridCount; i++)
{
newRidList[i].rid = bpp->absRids[i];
newRidList[i].token = bpp->values[i];
newRidList[i].pos = i;
}
tmpResultCounter = 0;
i = 0;
while (i < bpp->ridCount)
{
l_lbid = ((int64_t)newRidList[i].token) >> 10;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
/* When this is used as a filter, the strings can be thrown out. JLF currently
* constructs joblists s.t. only a FilterCommand will use the strings.
*/
primMsg->OutputType = (fFilterFeeder == NOT_FEEDER ? OT_RID : OT_RID | OT_DATAVALUE);
pt = (OldGetSigParams*)(primMsg->tokens);
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid))
{
if (UNLIKELY(l_lbid < 0))
pt[primMsg->NVALS].rid = (fFilterFeeder == NOT_FEEDER ? newRidList[i].rid : i) | 0x8000000000000000LL;
else
pt[primMsg->NVALS].rid = (fFilterFeeder == NOT_FEEDER ? newRidList[i].rid : i);
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
idbassert(pt[primMsg->NVALS].offsetIndex != 0);
primMsg->NVALS++;
i++;
}
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(true);
if (fFilterFeeder == NOT_FEEDER)
processResult();
else
copyResultToTmpSpace(newRidList.get());
}
inputRidCount = bpp->ridCount;
bpp->ridCount = tmpResultCounter;
// check if feeding a filtercommand
if (fFilterFeeder != NOT_FEEDER)
{
sort(&newRidList[0], &newRidList[inputRidCount], PosSorter());
copyResultToFinalPosition(newRidList.get());
copyRidsForFilterCmd();
}
if (fMinMax[0] <= fMinMax[1] && bpp->valuesLBID != 0)
{
bpp->validCPData = true;
bpp->cpDataFromDictScan = true;
bpp->lbidForCP = bpp->valuesLBID;
bpp->maxVal = fMinMax[1];
bpp->minVal = fMinMax[0];
}
// cout << "DS: /_execute()\n";
}
/* This will do the same thing as execute() but put the result in bpp->serialized */
void DictStep::_project()
{
/* Need to loop over bpp->values, issuing a primitive for each LBID */
uint32_t i;
int64_t l_lbid = 0;
OldGetSigParams* pt;
string tmpStrings[LOGICAL_BLOCK_RIDS];
boost::scoped_array<OrderedToken> newRidList;
// make the OrderedToken list
newRidList.reset(new OrderedToken[bpp->ridCount]);
for (i = 0; i < bpp->ridCount; i++)
{
newRidList[i].rid = bpp->absRids[i];
newRidList[i].token = values[i];
newRidList[i].pos = i;
}
// cout << "DS: _project()\n";
tmpResultCounter = 0;
totalResultLength = 0;
i = 0;
while (i < bpp->ridCount)
{
l_lbid = ((int64_t)newRidList[i].token) >> 10;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams*)(primMsg->tokens);
//@bug 972
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid))
{
if (l_lbid < 0)
pt[primMsg->NVALS].rid = newRidList[i].rid | 0x8000000000000000LL;
else
pt[primMsg->NVALS].rid = newRidList[i].rid;
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
idbassert(pt[primMsg->NVALS].offsetIndex > 0);
primMsg->NVALS++;
i++;
}
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(false);
projectResult(tmpStrings);
}
idbassert(tmpResultCounter == bpp->ridCount);
*bpp->serialized << totalResultLength;
// cout << "_project() total length = " << totalResultLength << endl;
for (i = 0; i < tmpResultCounter; i++)
{
// cout << "serializing " << tmpStrings[i] << endl;
*bpp->serialized << tmpStrings[i];
}
// cout << "DS: /_project() l: " << l_lbid << endl;
}
void DictStep::_projectToRG(RowGroup& rg, uint32_t col)
{
/* Need to loop over bpp->values, issuing a primitive for each LBID */
uint32_t i;
int64_t l_lbid = 0;
int64_t o_lbid = 0;
OldGetSigParams* pt;
StringPtr* tmpStrings = new StringPtr[LOGICAL_BLOCK_RIDS];
rowgroup::Row r;
boost::scoped_array<OrderedToken> newRidList;
// make the OrderedToken list
newRidList.reset(new OrderedToken[bpp->ridCount]);
for (i = 0; i < bpp->ridCount; i++)
{
newRidList[i].rid = bpp->absRids[i];
newRidList[i].token = values[i];
newRidList[i].pos = i;
}
sort(&newRidList[0], &newRidList[bpp->ridCount], TokenSorter());
rg.initRow(&r);
uint32_t curResultCounter = 0;
tmpResultCounter = 0;
totalResultLength = 0;
i = 0;
// cout << "DS: projectingToRG rids: " << bpp->ridCount << endl;
while (i < bpp->ridCount)
{
l_lbid = ((int64_t)newRidList[i].token) >> 10;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams*)(primMsg->tokens);
//@bug 972
//@bug 1821
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid || l_lbid == -1 ||
((((int64_t)newRidList[i].token) >> 10) & 0x8000000000000000LL)))
{
//@bug 1821
if (newRidList[i].token == 0)
{
ostringstream oss;
ostringstream oss2;
oss << l_lbid;
logging::Message::Args args;
args.add("0");
args.add(oss.str());
oss2 << newRidList[i].rid;
args.add(oss2.str());
primitiveprocessor::mlp->logMessage(logging::M0068, args, true);
newRidList[i].token = 0xfffffffffffffffeLL;
values[newRidList[i].pos] = 0xfffffffffffffffeLL;
}
if ((((int64_t)newRidList[i].token) >> 10) < 0)
{
pt[primMsg->NVALS].rid = newRidList[i].rid | 0x8000000000000000LL;
}
else
{
if ((((int64_t)newRidList[i].token) >> 10) > 0 && o_lbid == 0)
l_lbid = o_lbid = (((int64_t)newRidList[i].token) >> 10);
pt[primMsg->NVALS].rid = newRidList[i].rid;
}
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
idbassert(pt[primMsg->NVALS].offsetIndex > 0);
primMsg->NVALS++;
// pt++;
i++;
}
if (((int64_t)primMsg->LBID) < 0 && o_lbid > 0)
primMsg->LBID = o_lbid & 0xFFFFFFFFFL;
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(false);
projectResult(tmpStrings);
o_lbid = 0;
// cout << "DS: project & issue l: " << (int64_t)primMsg->LBID << " NVALS: " << primMsg->NVALS << endl;
// bug 4901 - move this inside the loop and call incrementally
// to save the unnecessary string copy
if ((rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) &&
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::BLOB) &&
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::TEXT))
{
for (i = curResultCounter; i < tmpResultCounter; i++)
{
rg.getRow(newRidList[i].pos, &r);
// std::cerr << "serializing " << tmpStrings[i] << endl;
r.setStringField(tmpStrings[i].getConstString(), col);
}
}
else
{
uint32_t firstTmpResultCounter = tmpResultCounter;
std::map<uint32_t, string*> result;
for (i = curResultCounter; i < firstTmpResultCounter; i++)
{
rg.getRow(newRidList[i].pos, &r);
// If this is a multi-block blob, get all the blocks
// We do string copy here, should maybe have a RowGroup
// function to append strings or something?
// XXX: can NULLs be a valid value for multipart blob?
if (((newRidList[i].token >> 46) < 0x3FFFF) && ((newRidList[i].token >> 46) > 0))
{
StringPtr multi_part[1];
uint16_t old_offset = primMsg->tokens[0].offset;
if (result.empty())
{
// String copy here because tmpStrings pointers will be blown away below
for (uint32_t x = curResultCounter; x < firstTmpResultCounter; x++)
{
result[x] = new string((char*)tmpStrings[x].ptr, tmpStrings[x].len);
}
}
uint64_t origin_lbid = primMsg->LBID;
uint32_t lbid_count = newRidList[i].token >> 46;
primMsg->tokens[0].offset = 1; // first offset of a sig
for (uint32_t j = 1; j <= lbid_count; j++)
{
tmpResultCounter = 0;
primMsg->LBID = origin_lbid + j;
primMsg->NVALS = 1;
primMsg->tokens[0].LBID = origin_lbid + j;
issuePrimitive(false);
projectResult(multi_part);
result[i]->append((char*)multi_part[0].ptr, multi_part[0].len);
}
primMsg->tokens[0].offset = old_offset;
primMsg->LBID = origin_lbid;
tmpResultCounter = firstTmpResultCounter;
r.setVarBinaryField((unsigned char*)result[i]->c_str(), result[i]->length(), col);
delete result[i];
}
else
{
r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col);
}
}
}
curResultCounter = tmpResultCounter;
}
// cout << "_projectToRG() total length = " << totalResultLength << endl;
idbassert(tmpResultCounter == bpp->ridCount);
delete[] tmpStrings;
// cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID
// << " len: " << tmpResultCounter
// << endl;
}
void DictStep::project()
{
values = bpp->values;
_project();
}
void DictStep::project(int64_t* vals)
{
values = vals;
_project();
}
void DictStep::projectIntoRowGroup(RowGroup& rg, uint32_t col)
{
values = bpp->values;
_projectToRG(rg, col);
}
void DictStep::projectIntoRowGroup(RowGroup& rg, int64_t* vals, uint32_t col)
{
values = vals;
_projectToRG(rg, col);
}
uint64_t DictStep::getLBID()
{
return 0;
}
void DictStep::nextLBID()
{
}
SCommand DictStep::duplicate()
{
SCommand ret;
DictStep* ds;
ret.reset(new DictStep());
ds = (DictStep*)ret.get();
ds->BOP = BOP;
ds->fFilterFeeder = fFilterFeeder;
ds->compressionType = compressionType;
ds->hasEqFilter = hasEqFilter;
ds->eqFilter = eqFilter;
ds->eqOp = eqOp;
ds->filterString = filterString;
ds->filterCount = filterCount;
ds->charsetNumber = charsetNumber;
ds->Command::operator=(*this);
return ret;
}
bool DictStep::operator==(const DictStep& ds) const
{
return ((BOP == ds.BOP) && (fFilterFeeder == ds.fFilterFeeder) && (compressionType == ds.compressionType) &&
(filterString == ds.filterString) && (filterCount == ds.filterCount));
}
bool DictStep::operator!=(const DictStep& ds) const
{
return !(*this == ds);
}
}; // namespace primitiveprocessor