mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
This patch improves handling of NULLs in textual fields in ColumnStore. Previously empty strings were considered NULLs and it could be a problem if data scheme allows for empty strings. It was also one of major reasons of behavior difference between ColumnStore and other engines in MariaDB family. Also, this patch fixes some other bugs and incorrect behavior, for example, incorrect comparison for "column <= ''" which evaluates to constant True for all purposes before this patch.
713 lines
19 KiB
C++
713 lines
19 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
//
|
|
// $Id: dictstep.cpp 2110 2013-06-19 15:51:38Z bwilkinson $
|
|
// C++ Implementation: dictstep
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: Patrick LeBlanc <pleblanc@calpont.com>, (C) 2008
|
|
//
|
|
// Copyright: See COPYING file that comes with this distribution
|
|
//
|
|
//
|
|
|
|
#include <unistd.h>
|
|
#include <algorithm>
|
|
|
|
#include "bpp.h"
|
|
#include "primitiveserver.h"
|
|
#include "pp_logger.h"
|
|
#include "../linux-port/primitiveprocessor.h"
|
|
|
|
using namespace std;
|
|
using namespace messageqcpp;
|
|
using namespace rowgroup;
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
extern uint32_t dictBufferSize;
|
|
|
|
DictStep::DictStep() : Command(DICT_STEP), strValues(NULL), filterCount(0), bufferSize(0)
|
|
{
|
|
fMinMax[0] = MAX_UBIGINT;
|
|
fMinMax[1] = MIN_UBIGINT;
|
|
}
|
|
|
|
DictStep::~DictStep()
|
|
{
|
|
}
|
|
|
|
DictStep& DictStep::operator=(const DictStep& d)
|
|
{
|
|
// Not all fields are copied for the sake of efficiency.
|
|
// Right now, these only need to copy fields deserialized from the create msg.
|
|
BOP = d.BOP;
|
|
fFilterFeeder = d.fFilterFeeder;
|
|
compressionType = d.compressionType;
|
|
filterString = d.filterString;
|
|
hasEqFilter = d.hasEqFilter;
|
|
eqFilter = d.eqFilter;
|
|
eqOp = d.eqOp;
|
|
filterCount = d.filterCount;
|
|
charsetNumber = d.charsetNumber;
|
|
fMinMax[0] = d.fMinMax[0];
|
|
fMinMax[1] = d.fMinMax[1];
|
|
return *this;
|
|
}
|
|
|
|
void DictStep::createCommand(ByteStream& bs)
|
|
{
|
|
uint8_t tmp8;
|
|
|
|
bs.advance(1);
|
|
bs >> BOP;
|
|
bs >> tmp8;
|
|
compressionType = tmp8;
|
|
bs >> charsetNumber;
|
|
bs >> filterCount;
|
|
bs >> tmp8;
|
|
hasEqFilter = tmp8;
|
|
|
|
if (hasEqFilter)
|
|
{
|
|
string strTmp;
|
|
datatypes::Charset cs(charsetNumber);
|
|
eqFilter.reset(new primitives::DictEqualityFilter(cs));
|
|
bs >> eqOp;
|
|
|
|
for (uint32_t i = 0; i < filterCount; i++)
|
|
{
|
|
bs >> strTmp;
|
|
eqFilter->insert(strTmp);
|
|
}
|
|
}
|
|
else
|
|
bs >> filterString;
|
|
|
|
Command::createCommand(bs);
|
|
}
|
|
|
|
void DictStep::resetCommand(ByteStream& bs)
|
|
{
|
|
}
|
|
|
|
void DictStep::prep(int8_t outputType, bool makeAbsRids)
|
|
{
|
|
// (at most there are 8192 tokens to fetch)
|
|
bufferSize = sizeof(DictInput) + filterString.length() + (8192 * sizeof(OldGetSigParams));
|
|
inputMsg.reset(new uint8_t[bufferSize]);
|
|
primMsg = (DictInput*)inputMsg.get();
|
|
|
|
primMsg->ism.Interleave = 0;
|
|
primMsg->ism.Flags = 0;
|
|
primMsg->ism.Command = DICT_SIGNATURE;
|
|
primMsg->ism.Size = bufferSize;
|
|
primMsg->ism.Type = 2;
|
|
primMsg->hdr.SessionID = bpp->sessionID;
|
|
// primMsg->hdr.StatementID = 0;
|
|
primMsg->hdr.TransactionID = bpp->txnID;
|
|
primMsg->hdr.VerID = bpp->versionInfo.currentScn;
|
|
primMsg->hdr.StepID = bpp->stepID;
|
|
primMsg->BOP = BOP;
|
|
primMsg->InputFlags = 1; // TODO: Use the new p_Dict functionality
|
|
primMsg->OutputType =
|
|
(eqFilter || filterCount || fFilterFeeder != NOT_FEEDER ? OT_RID : OT_RID | OT_DATAVALUE);
|
|
primMsg->NOPS = (eqFilter ? 0 : filterCount);
|
|
primMsg->NVALS = 0;
|
|
}
|
|
|
|
void DictStep::issuePrimitive(bool isFilter)
|
|
{
|
|
bool wasCached;
|
|
uint32_t blocksRead;
|
|
|
|
if (!(primMsg->LBID & 0x8000000000000000LL))
|
|
{
|
|
// std::cerr << "DS issuePrimitive lbid: " << (uint64_t)primMsg->LBID << endl;
|
|
primitiveprocessor::loadBlock(primMsg->LBID, bpp->versionInfo, bpp->txnID, compressionType,
|
|
bpp->blockData, &wasCached, &blocksRead, bpp->LBIDTrace, bpp->sessionID);
|
|
|
|
if (wasCached)
|
|
bpp->cachedIO++;
|
|
|
|
bpp->physIO += blocksRead;
|
|
bpp->touchedBlocks++;
|
|
}
|
|
#if !defined(XXX_PRIMITIVES_TOKEN_RANGES_XXX)
|
|
bpp->pp.p_Dictionary(primMsg, &result, isFilter, charsetNumber, eqFilter, eqOp);
|
|
#else
|
|
bpp->pp.p_Dictionary(primMsg, &result, isFilter, charsetNumber, eqFilter, eqOp, fMinMax);
|
|
#endif
|
|
}
|
|
|
|
void DictStep::copyResultToTmpSpace(OrderedToken* ot)
|
|
{
|
|
uint32_t i;
|
|
uint8_t* pos;
|
|
uint16_t len;
|
|
uint64_t rid64;
|
|
uint16_t rid16;
|
|
|
|
idbassert(primMsg->OutputType & OT_RID);
|
|
DictOutput* header = (DictOutput*)&result[0];
|
|
|
|
if (header->NVALS == 0)
|
|
return;
|
|
|
|
pos = &result[sizeof(DictOutput)];
|
|
|
|
for (i = 0; i < header->NVALS; i++)
|
|
{
|
|
rid64 = *((uint64_t*)pos);
|
|
pos += 8;
|
|
rid16 = rid64 & 0x1fff;
|
|
ot[rid16].inResult = true;
|
|
tmpResultCounter++;
|
|
|
|
if (primMsg->OutputType & OT_DATAVALUE)
|
|
{
|
|
NullString ns;
|
|
uint8_t isnull = *pos;
|
|
pos += 1;
|
|
len = *((uint16_t*)pos);
|
|
pos += 2;
|
|
if (!isnull) {
|
|
ns.assign(pos, len);
|
|
}
|
|
pos += len;
|
|
ot[rid16].str = ns;
|
|
|
|
//if (rid64 & 0x8000000000000000LL)
|
|
// ot[rid16].str = joblist::CPNULLSTRMARK;
|
|
}
|
|
}
|
|
}
|
|
|
|
void DictStep::copyResultToFinalPosition(OrderedToken* ot)
|
|
{
|
|
uint32_t i, resultPos = 0;
|
|
|
|
for (i = 0; i < inputRidCount; i++)
|
|
{
|
|
if (ot[i].inResult)
|
|
{
|
|
bpp->absRids[resultPos] = ot[i].rid;
|
|
bpp->relRids[resultPos] = ot[i].rid - bpp->baseRid;
|
|
|
|
if (primMsg->OutputType & OT_DATAVALUE)
|
|
(*strValues)[resultPos] = ot[i].str;
|
|
|
|
resultPos++;
|
|
}
|
|
}
|
|
}
|
|
|
|
void DictStep::processResult()
|
|
{
|
|
uint32_t i;
|
|
uint8_t* pos;
|
|
uint16_t len;
|
|
DictOutput* header = (DictOutput*)&result[0];
|
|
|
|
if (header->NVALS == 0)
|
|
return;
|
|
|
|
pos = &result[sizeof(DictOutput)];
|
|
|
|
for (i = 0; i < header->NVALS; i++, tmpResultCounter++)
|
|
{
|
|
if (primMsg->OutputType & OT_RID)
|
|
{
|
|
bpp->absRids[tmpResultCounter] = *((uint64_t*)pos);
|
|
pos += 8;
|
|
// bpp->relRids[tmpResultCounter] = bpp->absRids[tmpResultCounter] & 0x1fff;
|
|
bpp->relRids[tmpResultCounter] = bpp->absRids[tmpResultCounter] - bpp->baseRid;
|
|
}
|
|
|
|
if (primMsg->OutputType & OT_DATAVALUE)
|
|
{
|
|
uint8_t isnull = *pos;
|
|
pos += 1;
|
|
NullString ns;
|
|
len = *((uint16_t*)pos);
|
|
pos += 2;
|
|
if (!isnull)
|
|
{
|
|
ns.assign(pos, len);
|
|
}
|
|
pos += len;
|
|
(*strValues)[tmpResultCounter] = ns;
|
|
}
|
|
|
|
// cout << " stored " << (*strValues)[tmpResultCounter] << endl;
|
|
/* XXXPAT: disclaimer: this is how we do it in DictionaryStep; don't know
|
|
if it's necessary or not yet */
|
|
bpp->absRids[tmpResultCounter] &= 0x7FFFFFFFFFFFFFFFLL;
|
|
}
|
|
}
|
|
|
|
void DictStep::projectResult(string* strings)
|
|
{
|
|
uint32_t i;
|
|
uint8_t* pos;
|
|
uint16_t len;
|
|
DictOutput* header = (DictOutput*)&result[0];
|
|
if (header->NVALS == 0)
|
|
return;
|
|
|
|
pos = &result[sizeof(DictOutput)];
|
|
// cout << "projectResult() l: " << primMsg->LBID << " NVALS: " << header->NVALS << endl;
|
|
for (i = 0; i < header->NVALS; i++)
|
|
{
|
|
len = *((uint16_t*)pos);
|
|
pos += 2;
|
|
strings[tmpResultCounter++] = string((char*)pos, len);
|
|
// cout << "serialized length is " << len << " string is " << strings[tmpResultCounter-1] << " string
|
|
// length = " <<
|
|
// strings[tmpResultCounter-1].length() << endl;
|
|
pos += len;
|
|
totalResultLength += len + 4;
|
|
}
|
|
}
|
|
|
|
// bug4901 -
|
|
// This version of projectResult needs to stay in sync with
|
|
// the above. They are separate methods because the
|
|
// _projectToRG() method can deal with this optimized version
|
|
// where we only need to return the pointer and length. This
|
|
// is desirable because it avoids an unnecessary temporary
|
|
// string copy. The above version is still needed for the
|
|
// _project() method where it has to serialize the totalResultLength
|
|
// before starting to serialize strings.
|
|
void DictStep::projectResult(StringPtr* strings)
|
|
{
|
|
uint32_t i;
|
|
uint8_t* pos;
|
|
uint16_t len;
|
|
DictOutput* header = (DictOutput*)&result[0];
|
|
|
|
if (header->NVALS == 0)
|
|
return;
|
|
|
|
pos = &result[sizeof(DictOutput)];
|
|
|
|
// cout << "projectResult() l: " << primMsg->LBID << " NVALS: " << header->NVALS << endl;
|
|
for (i = 0; i < header->NVALS; i++)
|
|
{
|
|
uint8_t isnull = *pos;
|
|
pos += 1;
|
|
len = *((uint16_t*)pos);
|
|
pos += 2;
|
|
strings[tmpResultCounter++] = StringPtr(isnull ? nullptr : pos, len);
|
|
// cout << "serialized length is " << len << " string is " << strings[tmpResultCounter-1] << " string
|
|
// length = " << strings[tmpResultCounter-1].length() << endl;
|
|
pos += len;
|
|
totalResultLength += len + 4;
|
|
}
|
|
}
|
|
|
|
void DictStep::execute()
|
|
{
|
|
if (fFilterFeeder == LEFT_FEEDER)
|
|
strValues = &(bpp->fFiltStrValues[0]);
|
|
else if (fFilterFeeder == RIGHT_FEEDER)
|
|
strValues = &(bpp->fFiltStrValues[1]);
|
|
else
|
|
strValues = &(bpp->strValues);
|
|
|
|
_execute();
|
|
}
|
|
|
|
void DictStep::_execute()
|
|
{
|
|
/* Need to loop over bpp->values, issuing a primitive for each LBID */
|
|
uint64_t i;
|
|
int64_t l_lbid;
|
|
OldGetSigParams* pt;
|
|
boost::scoped_array<OrderedToken> newRidList;
|
|
|
|
// make the OrderedToken list
|
|
newRidList.reset(new OrderedToken[bpp->ridCount]);
|
|
|
|
for (i = 0; i < bpp->ridCount; i++)
|
|
{
|
|
newRidList[i].rid = bpp->absRids[i];
|
|
newRidList[i].token = bpp->values[i];
|
|
newRidList[i].pos = i;
|
|
}
|
|
|
|
tmpResultCounter = 0;
|
|
i = 0;
|
|
|
|
while (i < bpp->ridCount)
|
|
{
|
|
l_lbid = ((int64_t)newRidList[i].token) >> 10;
|
|
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
|
|
primMsg->NVALS = 0;
|
|
|
|
/* When this is used as a filter, the strings can be thrown out. JLF currently
|
|
* constructs joblists s.t. only a FilterCommand will use the strings.
|
|
*/
|
|
primMsg->OutputType = (fFilterFeeder == NOT_FEEDER ? OT_RID : OT_RID | OT_DATAVALUE);
|
|
|
|
pt = (OldGetSigParams*)(primMsg->tokens);
|
|
|
|
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid))
|
|
{
|
|
if (UNLIKELY(l_lbid < 0))
|
|
pt[primMsg->NVALS].rid = (fFilterFeeder == NOT_FEEDER ? newRidList[i].rid : i) | 0x8000000000000000LL;
|
|
else
|
|
pt[primMsg->NVALS].rid = (fFilterFeeder == NOT_FEEDER ? newRidList[i].rid : i);
|
|
|
|
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
|
|
idbassert(pt[primMsg->NVALS].offsetIndex != 0);
|
|
primMsg->NVALS++;
|
|
i++;
|
|
}
|
|
|
|
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
|
|
issuePrimitive(true);
|
|
|
|
if (fFilterFeeder == NOT_FEEDER)
|
|
processResult();
|
|
else
|
|
copyResultToTmpSpace(newRidList.get());
|
|
}
|
|
|
|
inputRidCount = bpp->ridCount;
|
|
bpp->ridCount = tmpResultCounter;
|
|
|
|
// check if feeding a filtercommand
|
|
if (fFilterFeeder != NOT_FEEDER)
|
|
{
|
|
sort(&newRidList[0], &newRidList[inputRidCount], PosSorter());
|
|
copyResultToFinalPosition(newRidList.get());
|
|
copyRidsForFilterCmd();
|
|
}
|
|
if (fMinMax[0] <= fMinMax[1] && bpp->valuesLBID != 0)
|
|
{
|
|
bpp->validCPData = true;
|
|
bpp->cpDataFromDictScan = true;
|
|
bpp->lbidForCP = bpp->valuesLBID;
|
|
bpp->maxVal = fMinMax[1];
|
|
bpp->minVal = fMinMax[0];
|
|
}
|
|
|
|
// cout << "DS: /_execute()\n";
|
|
}
|
|
|
|
/* This will do the same thing as execute() but put the result in bpp->serialized */
|
|
void DictStep::_project()
|
|
{
|
|
/* Need to loop over bpp->values, issuing a primitive for each LBID */
|
|
uint32_t i;
|
|
int64_t l_lbid = 0;
|
|
OldGetSigParams* pt;
|
|
string tmpStrings[LOGICAL_BLOCK_RIDS];
|
|
boost::scoped_array<OrderedToken> newRidList;
|
|
|
|
// make the OrderedToken list
|
|
newRidList.reset(new OrderedToken[bpp->ridCount]);
|
|
|
|
for (i = 0; i < bpp->ridCount; i++)
|
|
{
|
|
newRidList[i].rid = bpp->absRids[i];
|
|
newRidList[i].token = values[i];
|
|
newRidList[i].pos = i;
|
|
}
|
|
|
|
// cout << "DS: _project()\n";
|
|
tmpResultCounter = 0;
|
|
totalResultLength = 0;
|
|
i = 0;
|
|
|
|
while (i < bpp->ridCount)
|
|
{
|
|
l_lbid = ((int64_t)newRidList[i].token) >> 10;
|
|
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
|
|
primMsg->NVALS = 0;
|
|
primMsg->OutputType = OT_DATAVALUE;
|
|
pt = (OldGetSigParams*)(primMsg->tokens);
|
|
|
|
//@bug 972
|
|
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid))
|
|
{
|
|
if (l_lbid < 0)
|
|
pt[primMsg->NVALS].rid = newRidList[i].rid | 0x8000000000000000LL;
|
|
else
|
|
pt[primMsg->NVALS].rid = newRidList[i].rid;
|
|
|
|
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
|
|
idbassert(pt[primMsg->NVALS].offsetIndex > 0);
|
|
primMsg->NVALS++;
|
|
i++;
|
|
}
|
|
|
|
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
|
|
issuePrimitive(false);
|
|
projectResult(tmpStrings);
|
|
}
|
|
|
|
idbassert(tmpResultCounter == bpp->ridCount);
|
|
*bpp->serialized << totalResultLength;
|
|
|
|
// cout << "_project() total length = " << totalResultLength << endl;
|
|
for (i = 0; i < tmpResultCounter; i++)
|
|
{
|
|
// cout << "serializing " << tmpStrings[i] << endl;
|
|
*bpp->serialized << tmpStrings[i];
|
|
}
|
|
|
|
// cout << "DS: /_project() l: " << l_lbid << endl;
|
|
}
|
|
|
|
void DictStep::_projectToRG(RowGroup& rg, uint32_t col)
|
|
{
|
|
/* Need to loop over bpp->values, issuing a primitive for each LBID */
|
|
uint32_t i;
|
|
int64_t l_lbid = 0;
|
|
int64_t o_lbid = 0;
|
|
OldGetSigParams* pt;
|
|
StringPtr* tmpStrings = new StringPtr[LOGICAL_BLOCK_RIDS];
|
|
rowgroup::Row r;
|
|
boost::scoped_array<OrderedToken> newRidList;
|
|
|
|
// make the OrderedToken list
|
|
newRidList.reset(new OrderedToken[bpp->ridCount]);
|
|
|
|
for (i = 0; i < bpp->ridCount; i++)
|
|
{
|
|
newRidList[i].rid = bpp->absRids[i];
|
|
newRidList[i].token = values[i];
|
|
newRidList[i].pos = i;
|
|
}
|
|
|
|
sort(&newRidList[0], &newRidList[bpp->ridCount], TokenSorter());
|
|
|
|
rg.initRow(&r);
|
|
uint32_t curResultCounter = 0;
|
|
tmpResultCounter = 0;
|
|
totalResultLength = 0;
|
|
i = 0;
|
|
|
|
// cout << "DS: projectingToRG rids: " << bpp->ridCount << endl;
|
|
while (i < bpp->ridCount)
|
|
{
|
|
l_lbid = ((int64_t)newRidList[i].token) >> 10;
|
|
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
|
|
primMsg->NVALS = 0;
|
|
primMsg->OutputType = OT_DATAVALUE;
|
|
pt = (OldGetSigParams*)(primMsg->tokens);
|
|
|
|
//@bug 972
|
|
//@bug 1821
|
|
while (i < bpp->ridCount && ((((int64_t)newRidList[i].token) >> 10) == l_lbid || l_lbid == -1 ||
|
|
((((int64_t)newRidList[i].token) >> 10) & 0x8000000000000000LL)))
|
|
{
|
|
//@bug 1821
|
|
if (newRidList[i].token == 0)
|
|
{
|
|
ostringstream oss;
|
|
ostringstream oss2;
|
|
oss << l_lbid;
|
|
logging::Message::Args args;
|
|
args.add("0");
|
|
args.add(oss.str());
|
|
oss2 << newRidList[i].rid;
|
|
args.add(oss2.str());
|
|
primitiveprocessor::mlp->logMessage(logging::M0068, args, true);
|
|
newRidList[i].token = 0xfffffffffffffffeLL;
|
|
values[newRidList[i].pos] = 0xfffffffffffffffeLL;
|
|
}
|
|
|
|
if ((((int64_t)newRidList[i].token) >> 10) < 0)
|
|
{
|
|
pt[primMsg->NVALS].rid = newRidList[i].rid | 0x8000000000000000LL;
|
|
}
|
|
else
|
|
{
|
|
if ((((int64_t)newRidList[i].token) >> 10) > 0 && o_lbid == 0)
|
|
l_lbid = o_lbid = (((int64_t)newRidList[i].token) >> 10);
|
|
|
|
pt[primMsg->NVALS].rid = newRidList[i].rid;
|
|
}
|
|
|
|
pt[primMsg->NVALS].offsetIndex = newRidList[i].token & 0x3ff;
|
|
idbassert(pt[primMsg->NVALS].offsetIndex > 0);
|
|
primMsg->NVALS++;
|
|
// pt++;
|
|
i++;
|
|
}
|
|
|
|
if (((int64_t)primMsg->LBID) < 0 && o_lbid > 0)
|
|
primMsg->LBID = o_lbid & 0xFFFFFFFFFL;
|
|
|
|
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
|
|
issuePrimitive(false);
|
|
projectResult(tmpStrings);
|
|
o_lbid = 0;
|
|
// cout << "DS: project & issue l: " << (int64_t)primMsg->LBID << " NVALS: " << primMsg->NVALS << endl;
|
|
|
|
// bug 4901 - move this inside the loop and call incrementally
|
|
// to save the unnecessary string copy
|
|
if ((rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) &&
|
|
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::BLOB) &&
|
|
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::TEXT))
|
|
{
|
|
for (i = curResultCounter; i < tmpResultCounter; i++)
|
|
{
|
|
rg.getRow(newRidList[i].pos, &r);
|
|
// std::cerr << "serializing " << tmpStrings[i] << endl;
|
|
r.setStringField(tmpStrings[i].getConstString(), col);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
uint32_t firstTmpResultCounter = tmpResultCounter;
|
|
std::map<uint32_t, string*> result;
|
|
|
|
for (i = curResultCounter; i < firstTmpResultCounter; i++)
|
|
{
|
|
rg.getRow(newRidList[i].pos, &r);
|
|
|
|
// If this is a multi-block blob, get all the blocks
|
|
// We do string copy here, should maybe have a RowGroup
|
|
// function to append strings or something?
|
|
// XXX: can NULLs be a valid value for multipart blob?
|
|
if (((newRidList[i].token >> 46) < 0x3FFFF) && ((newRidList[i].token >> 46) > 0))
|
|
{
|
|
StringPtr multi_part[1];
|
|
uint16_t old_offset = primMsg->tokens[0].offset;
|
|
|
|
if (result.empty())
|
|
{
|
|
// String copy here because tmpStrings pointers will be blown away below
|
|
for (uint32_t x = curResultCounter; x < firstTmpResultCounter; x++)
|
|
{
|
|
result[x] = new string((char*)tmpStrings[x].ptr, tmpStrings[x].len);
|
|
}
|
|
}
|
|
|
|
uint64_t origin_lbid = primMsg->LBID;
|
|
uint32_t lbid_count = newRidList[i].token >> 46;
|
|
primMsg->tokens[0].offset = 1; // first offset of a sig
|
|
|
|
for (uint32_t j = 1; j <= lbid_count; j++)
|
|
{
|
|
tmpResultCounter = 0;
|
|
primMsg->LBID = origin_lbid + j;
|
|
primMsg->NVALS = 1;
|
|
primMsg->tokens[0].LBID = origin_lbid + j;
|
|
issuePrimitive(false);
|
|
projectResult(multi_part);
|
|
result[i]->append((char*)multi_part[0].ptr, multi_part[0].len);
|
|
}
|
|
|
|
primMsg->tokens[0].offset = old_offset;
|
|
primMsg->LBID = origin_lbid;
|
|
tmpResultCounter = firstTmpResultCounter;
|
|
r.setVarBinaryField((unsigned char*)result[i]->c_str(), result[i]->length(), col);
|
|
delete result[i];
|
|
}
|
|
else
|
|
{
|
|
r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col);
|
|
}
|
|
}
|
|
}
|
|
|
|
curResultCounter = tmpResultCounter;
|
|
}
|
|
|
|
// cout << "_projectToRG() total length = " << totalResultLength << endl;
|
|
idbassert(tmpResultCounter == bpp->ridCount);
|
|
|
|
delete[] tmpStrings;
|
|
// cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID
|
|
// << " len: " << tmpResultCounter
|
|
// << endl;
|
|
}
|
|
|
|
void DictStep::project()
|
|
{
|
|
values = bpp->values;
|
|
_project();
|
|
}
|
|
|
|
void DictStep::project(int64_t* vals)
|
|
{
|
|
values = vals;
|
|
_project();
|
|
}
|
|
|
|
void DictStep::projectIntoRowGroup(RowGroup& rg, uint32_t col)
|
|
{
|
|
values = bpp->values;
|
|
_projectToRG(rg, col);
|
|
}
|
|
|
|
void DictStep::projectIntoRowGroup(RowGroup& rg, int64_t* vals, uint32_t col)
|
|
{
|
|
values = vals;
|
|
_projectToRG(rg, col);
|
|
}
|
|
|
|
uint64_t DictStep::getLBID()
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
void DictStep::nextLBID()
|
|
{
|
|
}
|
|
|
|
SCommand DictStep::duplicate()
|
|
{
|
|
SCommand ret;
|
|
DictStep* ds;
|
|
|
|
ret.reset(new DictStep());
|
|
ds = (DictStep*)ret.get();
|
|
ds->BOP = BOP;
|
|
ds->fFilterFeeder = fFilterFeeder;
|
|
ds->compressionType = compressionType;
|
|
ds->hasEqFilter = hasEqFilter;
|
|
ds->eqFilter = eqFilter;
|
|
ds->eqOp = eqOp;
|
|
ds->filterString = filterString;
|
|
ds->filterCount = filterCount;
|
|
ds->charsetNumber = charsetNumber;
|
|
ds->Command::operator=(*this);
|
|
return ret;
|
|
}
|
|
|
|
bool DictStep::operator==(const DictStep& ds) const
|
|
{
|
|
return ((BOP == ds.BOP) && (fFilterFeeder == ds.fFilterFeeder) && (compressionType == ds.compressionType) &&
|
|
(filterString == ds.filterString) && (filterCount == ds.filterCount));
|
|
}
|
|
|
|
bool DictStep::operator!=(const DictStep& ds) const
|
|
{
|
|
return !(*this == ds);
|
|
}
|
|
|
|
}; // namespace primitiveprocessor
|