1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-07 19:22:02 +03:00
2025-05-15 19:05:38 +04:00

714 lines
19 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: dictstep.cpp 2110 2013-06-19 15:51:38Z bwilkinson $
// C++ Implementation: dictstep
//
// Description:
//
//
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <unistd.h>
#include <algorithm>
#include "bpp.h"
#include "primitiveserver.h"
#include "pp_logger.h"
#include "../linux-port/primitiveprocessor.h"
using namespace std;
using namespace messageqcpp;
using namespace rowgroup;
namespace primitiveprocessor
{
extern uint32_t dictBufferSize;
DictStep::DictStep() : Command(DICT_STEP), strValues(NULL), filterCount(0), bufferSize(0)
{
fMinMax[0] = MAX_UBIGINT;
fMinMax[1] = MIN_UBIGINT;
}
DictStep::~DictStep()
{
}
DictStep& DictStep::operator=(const DictStep& d)
{
// Not all fields are copied for the sake of efficiency.
// Right now, these only need to copy fields deserialized from the create msg.
BOP = d.BOP;
fFilterFeeder = d.fFilterFeeder;
compressionType = d.compressionType;
filterString = d.filterString;
hasEqFilter = d.hasEqFilter;
eqFilter = d.eqFilter;
eqOp = d.eqOp;
filterCount = d.filterCount;
charsetNumber = d.charsetNumber;
fMinMax[0] = d.fMinMax[0];
fMinMax[1] = d.fMinMax[1];
return *this;
}
void DictStep::createCommand(ByteStream& bs)
{
uint8_t tmp8;
bs.advance(1);
bs >> BOP;
bs >> tmp8;
compressionType = tmp8;
bs >> charsetNumber;
bs >> filterCount;
bs >> tmp8;
hasEqFilter = tmp8;
if (hasEqFilter)
{
string strTmp;
datatypes::Charset cs(charsetNumber);
eqFilter.reset(new primitives::DictEqualityFilter(cs));
bs >> eqOp;
for (uint32_t i = 0; i < filterCount; i++)
{
bs >> strTmp;
eqFilter->insert(strTmp);
}
}
else
bs >> filterString;
Command::createCommand(bs);
}
void DictStep::resetCommand(ByteStream& /*bs*/)
{
}
void DictStep::prep(int8_t /*outputType*/, bool /*makeAbsRids*/)
{
// (at most there are 8192 tokens to fetch)
bufferSize = sizeof(DictInput) + filterString.length() + (8192 * sizeof(OldGetSigParams));
inputMsg.reset(new uint8_t[bufferSize]);
primMsg = (DictInput*)inputMsg.get();
primMsg->ism.Interleave = 0;
primMsg->ism.Flags = 0;
primMsg->ism.Command = DICT_SIGNATURE;
primMsg->ism.Size = bufferSize;
primMsg->ism.Type = 2;
primMsg->hdr.SessionID = bpp->sessionID;
// primMsg->hdr.StatementID = 0;
primMsg->hdr.TransactionID = bpp->txnID;
primMsg->hdr.VerID = bpp->versionInfo.currentScn;
primMsg->hdr.StepID = bpp->stepID;
primMsg->BOP = BOP;
primMsg->InputFlags = 1; // TODO: Use the new p_Dict functionality
primMsg->OutputType =
(eqFilter || filterCount || fFilterFeeder != NOT_FEEDER ? OT_RID : OT_RID | OT_DATAVALUE);
primMsg->NOPS = (eqFilter ? 0 : filterCount);
primMsg->NVALS = 0;
}
void DictStep::issuePrimitive(bool isFilter)
{
bool wasCached;
uint32_t blocksRead;
if (!(primMsg->LBID & 0x8000000000000000LL))
{
// std::cerr << "DS issuePrimitive lbid: " << (uint64_t)primMsg->LBID << endl;
primitiveprocessor::loadBlock(primMsg->LBID, bpp->versionInfo, bpp->txnID, compressionType,
bpp->blockData, &wasCached, &blocksRead, bpp->LBIDTrace, bpp->sessionID);
if (wasCached)
bpp->cachedIO++;
bpp->physIO += blocksRead;
bpp->touchedBlocks++;
}
#if !defined(XXX_PRIMITIVES_TOKEN_RANGES_XXX)
bpp->pp.p_Dictionary(primMsg, &result, isFilter, charsetNumber, eqFilter, eqOp);
#else
bpp->pp.p_Dictionary(primMsg, &result, isFilter, charsetNumber, eqFilter, eqOp, fMinMax);
#endif
}
void DictStep::copyResultToTmpSpace(OrderedToken* ot)
{
uint32_t i;
uint8_t* pos;
uint16_t len;
uint64_t rid64;
uint16_t rid16;
idbassert(primMsg->OutputType & OT_RID);
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
for (i = 0; i < header->NVALS; i++)
{
rid64 = *((uint64_t*)pos);
pos += 8;
rid16 = rid64 & 0x1fff;
ot[rid16].inResult = true;
tmpResultCounter++;
if (primMsg->OutputType & OT_DATAVALUE)
{
NullString ns;
uint8_t isnull = *pos;
pos += 1;
len = *((uint16_t*)pos);
pos += 2;
if (!isnull)
{
ns.assign(pos, len);
}
pos += len;
ot[rid16].str = ns;
// if (rid64 & 0x8000000000000000LL)
// ot[rid16].str = joblist::CPNULLSTRMARK;
}
}
}
void DictStep::copyResultToFinalPosition(OrderedToken* ot)
{
uint32_t i, resultPos = 0;
for (i = 0; i < inputRidCount; i++)
{
if (ot[i].inResult)
{
bpp->absRids[resultPos] = ot[i].rid;
bpp->relRids[resultPos] = ot[i].rid - bpp->baseRid;
if (primMsg->OutputType & OT_DATAVALUE)
(*strValues)[resultPos] = ot[i].str;
resultPos++;
}
}
}
void DictStep::processResult()
{
uint32_t i;
uint8_t* pos;
uint16_t len;
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
for (i = 0; i < header->NVALS; i++, tmpResultCounter++)
{
if (primMsg->OutputType & OT_RID)
{
bpp->absRids[tmpResultCounter] = *((uint64_t*)pos);
pos += 8;
// bpp->relRids[tmpResultCounter] = bpp->absRids[tmpResultCounter] & 0x1fff;
bpp->relRids[tmpResultCounter] = bpp->absRids[tmpResultCounter] - bpp->baseRid;
}
if (primMsg->OutputType & OT_DATAVALUE)
{
uint8_t isnull = *pos;
pos += 1;
NullString ns;
len = *((uint16_t*)pos);
pos += 2;
if (!isnull)
{
ns.assign(pos, len);
}
pos += len;
(*strValues)[tmpResultCounter] = ns;
}
// cout << " stored " << (*strValues)[tmpResultCounter] << endl;
/* XXXPAT: disclaimer: this is how we do it in DictionaryStep; don't know
if it's necessary or not yet */
bpp->absRids[tmpResultCounter] &= 0x7FFFFFFFFFFFFFFFLL;
}
}
void DictStep::projectResult(string* strings)
{
uint32_t i;
uint8_t* pos;
uint16_t len;
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
// cout << "projectResult() l: " << primMsg->LBID << " NVALS: " << header->NVALS << endl;
for (i = 0; i < header->NVALS; i++)
{
len = *((uint16_t*)pos);
pos += 2;
strings[tmpResultCounter++] = string((char*)pos, len);
// cout << "serialized length is " << len << " string is " << strings[tmpResultCounter-1] << " string
// length = " <<
// strings[tmpResultCounter-1].length() << endl;
pos += len;
totalResultLength += len + 4;
}
}
// bug4901 -
// This version of projectResult needs to stay in sync with
// the above. They are separate methods because the
// _projectToRG() method can deal with this optimized version
// where we only need to return the pointer and length. This
// is desirable because it avoids an unnecessary temporary
// string copy. The above version is still needed for the
// _project() method where it has to serialize the totalResultLength
// before starting to serialize strings.
void DictStep::projectResult(StringPtr* strings)
{
uint32_t i;
uint8_t* pos;
uint16_t len;
DictOutput* header = (DictOutput*)&result[0];
if (header->NVALS == 0)
return;
pos = &result[sizeof(DictOutput)];
// cout << "projectResult() l: " << primMsg->LBID << " NVALS: " << header->NVALS << endl;
for (i = 0; i < header->NVALS; i++)
{
uint8_t isnull = *pos;
pos += 1;
len = *((uint16_t*)pos);
pos += 2;
strings[tmpResultCounter++] = StringPtr(isnull ? nullptr : pos, len);
// cout << "serialized length is " << len << " string is " << strings[tmpResultCounter-1] << " string
// length = " << strings[tmpResultCounter-1].length() << endl;
pos += len;
totalResultLength += len + 4;
}
}
void DictStep::execute()
{
if (fFilterFeeder == LEFT_FEEDER)
strValues = &(bpp->fFiltStrValues[0]);
else if (fFilterFeeder == RIGHT_FEEDER)
strValues = &(bpp->fFiltStrValues[1]);
else
strValues = &(bpp->strValues);
_execute();
}
void DictStep::_execute()
{
/* Need to loop over bpp->values, issuing a primitive for each LBID */
uint64_t i;
int64_t l_lbid;
OldGetSigParams* pt;
boost::scoped_array<OrderedToken> newRidList;
// make the OrderedToken list
newRidList.reset(new OrderedToken[bpp->ridCount]);
for (i = 0; i < bpp->ridCount; i++)
{
newRidList[i].rid = bpp->absRids[i];
newRidList[i].token = bpp->values[i];
newRidList[i].pos = i;
}
tmpResultCounter = 0;
i = 0;
while (i < bpp->ridCount)
{
l_lbid = ((int64_t)newRidList[i].token) >> 10;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
/* When this is used as a filter, the strings can be thrown out. JLF currently
* constructs joblists s.t. only a FilterCommand will use the strings.
*/
primMsg->OutputType = (fFilterFeeder == NOT_FEEDER ? OT_RID : OT_RID | OT_DATAVALUE);
pt = (OldGetSigParams*)(primMsg->tokens);
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid))
{
if (UNLIKELY(l_lbid < 0))
pt[primMsg->NVALS].rid = (fFilterFeeder == NOT_FEEDER ? newRidList[i].rid : i) | 0x8000000000000000LL;
else
pt[primMsg->NVALS].rid = (fFilterFeeder == NOT_FEEDER ? newRidList[i].rid : i);
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
idbassert(pt[primMsg->NVALS].offsetIndex != 0);
primMsg->NVALS++;
i++;
}
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(true);
if (fFilterFeeder == NOT_FEEDER)
processResult();
else
copyResultToTmpSpace(newRidList.get());
}
inputRidCount = bpp->ridCount;
bpp->ridCount = tmpResultCounter;
// check if feeding a filtercommand
if (fFilterFeeder != NOT_FEEDER)
{
sort(&newRidList[0], &newRidList[inputRidCount], PosSorter());
copyResultToFinalPosition(newRidList.get());
copyRidsForFilterCmd();
}
if (fMinMax[0] <= fMinMax[1] && bpp->valuesLBID != 0)
{
bpp->validCPData = true;
bpp->cpDataFromDictScan = true;
bpp->lbidForCP = bpp->valuesLBID;
bpp->maxVal = fMinMax[1];
bpp->minVal = fMinMax[0];
}
// cout << "DS: /_execute()\n";
}
/* This will do the same thing as execute() but put the result in bpp->serialized */
void DictStep::_project(messageqcpp::SBS& bs)
{
/* Need to loop over bpp->values, issuing a primitive for each LBID */
uint32_t i;
int64_t l_lbid = 0;
OldGetSigParams* pt;
string tmpStrings[LOGICAL_BLOCK_RIDS];
boost::scoped_array<OrderedToken> newRidList;
// make the OrderedToken list
newRidList.reset(new OrderedToken[bpp->ridCount]);
for (i = 0; i < bpp->ridCount; i++)
{
newRidList[i].rid = bpp->absRids[i];
newRidList[i].token = values[i];
newRidList[i].pos = i;
}
// cout << "DS: _project()\n";
tmpResultCounter = 0;
totalResultLength = 0;
i = 0;
while (i < bpp->ridCount)
{
l_lbid = ((int64_t)newRidList[i].token) >> 10;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams*)(primMsg->tokens);
//@bug 972
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid))
{
if (l_lbid < 0)
pt[primMsg->NVALS].rid = newRidList[i].rid | 0x8000000000000000LL;
else
pt[primMsg->NVALS].rid = newRidList[i].rid;
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
idbassert(pt[primMsg->NVALS].offsetIndex > 0);
primMsg->NVALS++;
i++;
}
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(false);
projectResult(tmpStrings);
}
idbassert(tmpResultCounter == bpp->ridCount);
*bs << totalResultLength;
// cout << "_project() total length = " << totalResultLength << endl;
for (i = 0; i < tmpResultCounter; i++)
{
// cout << "serializing " << tmpStrings[i] << endl;
*bs << tmpStrings[i];
}
// cout << "DS: /_project() l: " << l_lbid << endl;
}
void DictStep::_projectToRG(RowGroup& rg, uint32_t col)
{
/* Need to loop over bpp->values, issuing a primitive for each LBID */
uint32_t i;
int64_t l_lbid = 0;
int64_t o_lbid = 0;
OldGetSigParams* pt;
StringPtr* tmpStrings = new StringPtr[LOGICAL_BLOCK_RIDS];
rowgroup::Row r;
boost::scoped_array<OrderedToken> newRidList;
// make the OrderedToken list
newRidList.reset(new OrderedToken[bpp->ridCount]);
for (i = 0; i < bpp->ridCount; i++)
{
newRidList[i].rid = bpp->absRids[i];
newRidList[i].token = values[i];
newRidList[i].pos = i;
}
sort(&newRidList[0], &newRidList[bpp->ridCount], TokenSorter());
rg.initRow(&r);
uint32_t curResultCounter = 0;
tmpResultCounter = 0;
totalResultLength = 0;
i = 0;
// cout << "DS: projectingToRG rids: " << bpp->ridCount << endl;
while (i < bpp->ridCount)
{
l_lbid = ((int64_t)newRidList[i].token) >> 10;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams*)(primMsg->tokens);
//@bug 972
//@bug 1821
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid || l_lbid == -1 ||
((((int64_t)newRidList[i].token) >> 10) & 0x8000000000000000LL)))
{
//@bug 1821
if (newRidList[i].token == 0)
{
ostringstream oss;
ostringstream oss2;
oss << l_lbid;
logging::Message::Args args;
args.add("0");
args.add(oss.str());
oss2 << newRidList[i].rid;
args.add(oss2.str());
primitiveprocessor::mlp->logMessage(logging::M0068, args, true);
newRidList[i].token = 0xfffffffffffffffeLL;
values[newRidList[i].pos] = 0xfffffffffffffffeLL;
}
if ((((int64_t)newRidList[i].token) >> 10) < 0)
{
pt[primMsg->NVALS].rid = newRidList[i].rid | 0x8000000000000000LL;
}
else
{
if ((((int64_t)newRidList[i].token) >> 10) > 0 && o_lbid == 0)
l_lbid = o_lbid = (((int64_t)newRidList[i].token) >> 10);
pt[primMsg->NVALS].rid = newRidList[i].rid;
}
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
idbassert(pt[primMsg->NVALS].offsetIndex > 0);
primMsg->NVALS++;
// pt++;
i++;
}
if (((int64_t)primMsg->LBID) < 0 && o_lbid > 0)
primMsg->LBID = o_lbid & 0xFFFFFFFFFL;
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(false);
projectResult(tmpStrings);
o_lbid = 0;
// cout << "DS: project & issue l: " << (int64_t)primMsg->LBID << " NVALS: " << primMsg->NVALS << endl;
// bug 4901 - move this inside the loop and call incrementally
// to save the unnecessary string copy
if ((rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) &&
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::BLOB) &&
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::TEXT))
{
for (i = curResultCounter; i < tmpResultCounter; i++)
{
rg.getRow(newRidList[i].pos, &r);
// std::cerr << "serializing " << tmpStrings[i] << endl;
r.setStringField(tmpStrings[i].getConstString(), col);
}
}
else
{
uint32_t firstTmpResultCounter = tmpResultCounter;
std::map<uint32_t, string*> result;
for (i = curResultCounter; i < firstTmpResultCounter; i++)
{
rg.getRow(newRidList[i].pos, &r);
// If this is a multi-block blob, get all the blocks
// We do string copy here, should maybe have a RowGroup
// function to append strings or something?
// XXX: can NULLs be a valid value for multipart blob?
if (((newRidList[i].token >> 46) < 0x3FFFF) && ((newRidList[i].token >> 46) > 0))
{
StringPtr multi_part[1];
uint16_t old_offset = primMsg->tokens[0].offset;
if (result.empty())
{
// String copy here because tmpStrings pointers will be blown away below
for (uint32_t x = curResultCounter; x < firstTmpResultCounter; x++)
{
result[x] = new string((char*)tmpStrings[x].ptr, tmpStrings[x].len);
}
}
uint64_t origin_lbid = primMsg->LBID;
uint32_t lbid_count = newRidList[i].token >> 46;
primMsg->tokens[0].offset = 1; // first offset of a sig
for (uint32_t j = 1; j <= lbid_count; j++)
{
tmpResultCounter = 0;
primMsg->LBID = origin_lbid + j;
primMsg->NVALS = 1;
primMsg->tokens[0].LBID = origin_lbid + j;
issuePrimitive(false);
projectResult(multi_part);
result[i]->append((char*)multi_part[0].ptr, multi_part[0].len);
}
primMsg->tokens[0].offset = old_offset;
primMsg->LBID = origin_lbid;
tmpResultCounter = firstTmpResultCounter;
r.setVarBinaryField((unsigned char*)result[i]->c_str(), result[i]->length(), col);
delete result[i];
}
else
{
r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col);
}
}
}
curResultCounter = tmpResultCounter;
}
// cout << "_projectToRG() total length = " << totalResultLength << endl;
idbassert(tmpResultCounter == bpp->ridCount);
delete[] tmpStrings;
// cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID
// << " len: " << tmpResultCounter
// << endl;
}
void DictStep::project(messageqcpp::SBS& bs)
{
values = bpp->values;
_project(bs);
}
void DictStep::project(messageqcpp::SBS& bs, int64_t* vals)
{
values = vals;
_project(bs);
}
void DictStep::projectIntoRowGroup(RowGroup& rg, uint32_t col)
{
values = bpp->values;
_projectToRG(rg, col);
}
void DictStep::projectIntoRowGroup(RowGroup& rg, int64_t* vals, uint32_t col)
{
values = vals;
_projectToRG(rg, col);
}
uint64_t DictStep::getLBID()
{
return 0;
}
void DictStep::nextLBID()
{
}
SCommand DictStep::duplicate()
{
SCommand ret;
DictStep* ds;
ret.reset(new DictStep());
ds = (DictStep*)ret.get();
ds->BOP = BOP;
ds->fFilterFeeder = fFilterFeeder;
ds->compressionType = compressionType;
ds->hasEqFilter = hasEqFilter;
ds->eqFilter = eqFilter;
ds->eqOp = eqOp;
ds->filterString = filterString;
ds->filterCount = filterCount;
ds->charsetNumber = charsetNumber;
ds->Command::operator=(*this);
return ret;
}
bool DictStep::operator==(const DictStep& ds) const
{
return ((BOP == ds.BOP) && (fFilterFeeder == ds.fFilterFeeder) && (compressionType == ds.compressionType) &&
(filterString == ds.filterString) && (filterCount == ds.filterCount));
}
bool DictStep::operator!=(const DictStep& ds) const
{
return !(*this == ds);
}
}; // namespace primitiveprocessor