1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5522 Properly process pm join result count. (#2909)

This patch:
1. Properly processes situation when pm join result count is exceeded.
2. Adds session variable 'columnstore_max_pm_join_result_count` to control the limit.
This commit is contained in:
Denis Khalikov
2023-08-04 16:55:45 +03:00
committed by GitHub
parent 4f580d109d
commit 896e8dd769
17 changed files with 144 additions and 60 deletions

View File

@ -89,6 +89,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location)
, fDJSPartitionSize(100 * 1024 * 1024) , fDJSPartitionSize(100 * 1024 * 1024)
, fDJSMaxPartitionTreeDepth(8) , fDJSMaxPartitionTreeDepth(8)
, fDJSForceRun(false) , fDJSForceRun(false)
, fMaxPmJoinResultCount(1048576)
, // 100MB mem usage for disk based join, , // 100MB mem usage for disk based join,
fUMMemLimit(numeric_limits<int64_t>::max()) fUMMemLimit(numeric_limits<int64_t>::max())
, fIsDML(false) , fIsDML(false)
@ -463,6 +464,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const
b << fDJSPartitionSize; b << fDJSPartitionSize;
b << fDJSMaxPartitionTreeDepth; b << fDJSMaxPartitionTreeDepth;
b << (uint8_t)fDJSForceRun; b << (uint8_t)fDJSForceRun;
b << (uint32_t)fMaxPmJoinResultCount;
b << fUMMemLimit; b << fUMMemLimit;
b << (uint8_t)fIsDML; b << (uint8_t)fIsDML;
messageqcpp::ByteStream::octbyte timeZone = fTimeZone; messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
@ -661,6 +663,7 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
b >> fDJSPartitionSize; b >> fDJSPartitionSize;
b >> fDJSMaxPartitionTreeDepth; b >> fDJSMaxPartitionTreeDepth;
b >> (uint8_t&)fDJSForceRun; b >> (uint8_t&)fDJSForceRun;
b >> (uint32_t&)fMaxPmJoinResultCount;
b >> fUMMemLimit; b >> fUMMemLimit;
b >> tmp8; b >> tmp8;
fIsDML = tmp8; fIsDML = tmp8;

View File

@ -708,6 +708,15 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
return fDJSForceRun; return fDJSForceRun;
} }
void maxPmJoinResultCount(uint32_t value)
{
fMaxPmJoinResultCount = value;
}
uint32_t maxPmJoinResultCount()
{
return fMaxPmJoinResultCount;
}
void umMemLimit(uint64_t l) void umMemLimit(uint64_t l)
{ {
fUMMemLimit = l; fUMMemLimit = l;
@ -942,6 +951,7 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
uint64_t fDJSPartitionSize = 100 * 1024 * 1024; uint64_t fDJSPartitionSize = 100 * 1024 * 1024;
uint32_t fDJSMaxPartitionTreeDepth = 8; uint32_t fDJSMaxPartitionTreeDepth = 8;
bool fDJSForceRun = false; bool fDJSForceRun = false;
uint32_t fMaxPmJoinResultCount = 1048576;
int64_t fUMMemLimit = numeric_limits<int64_t>::max(); int64_t fUMMemLimit = numeric_limits<int64_t>::max();
bool fIsDML = false; bool fIsDML = false;
long fTimeZone = 0; long fTimeZone = 0;

View File

@ -1105,6 +1105,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
/* if HAS_JOINER, send the init params */ /* if HAS_JOINER, send the init params */
if (flags & HAS_JOINER) if (flags & HAS_JOINER)
{ {
bs << (uint32_t)maxPmJoinResultCount;
if (ot == ROW_GROUP) if (ot == ROW_GROUP)
{ {
idbassert(tJoiners.size() > 0); idbassert(tJoiners.size() > 0);

View File

@ -252,6 +252,11 @@ class BatchPrimitiveProcessorJL
uuid = u; uuid = u;
} }
void setMaxPmJoinResultCount(uint32_t count)
{
maxPmJoinResultCount = count;
}
private: private:
const size_t perColumnProjectWeight_ = 10; const size_t perColumnProjectWeight_ = 10;
const size_t perColumnFilteringWeight_ = 10; const size_t perColumnFilteringWeight_ = 10;
@ -374,6 +379,7 @@ class BatchPrimitiveProcessorJL
unsigned fJoinerChunkSize; unsigned fJoinerChunkSize;
uint32_t dbRoot; uint32_t dbRoot;
bool hasSmallOuterJoin; bool hasSmallOuterJoin;
uint32_t maxPmJoinResultCount = 1048576;
uint32_t _priority; uint32_t _priority;

View File

@ -211,6 +211,7 @@ struct JobInfo
, wfqLimitStart(0) , wfqLimitStart(0)
, wfqLimitCount(-1) , wfqLimitCount(-1)
, timeZone(0) , timeZone(0)
, maxPmJoinResultCount(1048576)
{ {
} }
ResourceManager* rm; ResourceManager* rm;
@ -368,6 +369,7 @@ struct JobInfo
bool djsForceRun; bool djsForceRun;
bool isDML; bool isDML;
long timeZone; long timeZone;
uint32_t maxPmJoinResultCount;
// This is for tracking any dynamically allocated ParseTree objects // This is for tracking any dynamically allocated ParseTree objects
// in simpleScalarFilterToParseTree() for later deletion in // in simpleScalarFilterToParseTree() for later deletion in

View File

@ -2068,6 +2068,7 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm,
jobInfo.partitionSize = csep->djsPartitionSize(); jobInfo.partitionSize = csep->djsPartitionSize();
jobInfo.djsMaxPartitionTreeDepth = csep->djsMaxPartitionTreeDepth(); jobInfo.djsMaxPartitionTreeDepth = csep->djsMaxPartitionTreeDepth();
jobInfo.djsForceRun = csep->djsForceRun(); jobInfo.djsForceRun = csep->djsForceRun();
jobInfo.maxPmJoinResultCount = csep->maxPmJoinResultCount();
jobInfo.umMemLimit.reset(new int64_t); jobInfo.umMemLimit.reset(new int64_t);
*(jobInfo.umMemLimit) = csep->umMemLimit(); *(jobInfo.umMemLimit) = csep->umMemLimit();
jobInfo.isDML = csep->isDML(); jobInfo.isDML = csep->isDML();

View File

@ -91,6 +91,7 @@ JobStep::JobStep(const JobInfo& j)
, fProgress(0) , fProgress(0)
, fStartTime(-1) , fStartTime(-1)
, fTimeZone(j.timeZone) , fTimeZone(j.timeZone)
, fMaxPmJoinResultCount(j.maxPmJoinResultCount)
{ {
QueryTeleServerParms tsp; QueryTeleServerParms tsp;
string teleServerHost(Config::makeConfig()->getConfig("QueryTele", "Host")); string teleServerHost(Config::makeConfig()->getConfig("QueryTele", "Host"));

View File

@ -497,6 +497,7 @@ class JobStep
int64_t fStartTime; int64_t fStartTime;
int64_t fLastStepTeleTime; int64_t fLastStepTeleTime;
long fTimeZone; long fTimeZone;
uint32_t fMaxPmJoinResultCount;
private: private:
static boost::mutex fLogMutex; static boost::mutex fLogMutex;

View File

@ -1198,6 +1198,11 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
return bRunFEonPM; return bRunFEonPM;
} }
void setMaxPmJoinResultCount(uint32_t count)
{
maxPmJoinResultCount = count;
}
protected: protected:
void sendError(uint16_t status); void sendError(uint16_t status);
@ -1339,6 +1344,8 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
boost::shared_ptr<RowGroupDL> deliveryDL; boost::shared_ptr<RowGroupDL> deliveryDL;
uint32_t deliveryIt; uint32_t deliveryIt;
uint32_t maxPmJoinResultCount;
class JoinLocalData class JoinLocalData
{ {
public: public:

View File

@ -1458,9 +1458,13 @@ void TupleBPS::run()
fBPP->setThreadCount(fMaxNumProcessorThreads); fBPP->setThreadCount(fMaxNumProcessorThreads);
if (doJoin) if (doJoin)
{
for (i = 0; i < smallSideCount; i++) for (i = 0; i < smallSideCount; i++)
tjoiners[i]->setThreadCount(fMaxNumProcessorThreads); tjoiners[i]->setThreadCount(fMaxNumProcessorThreads);
fBPP->setMaxPmJoinResultCount(fMaxPmJoinResultCount);
}
if (fe1) if (fe1)
fBPP->setFEGroup1(fe1, fe1Input); fBPP->setFEGroup1(fe1, fe1Input);

View File

@ -6642,6 +6642,7 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep)
csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024); csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024);
csep->djsMaxPartitionTreeDepth(get_diskjoin_max_partition_tree_depth(gwi.thd)); csep->djsMaxPartitionTreeDepth(get_diskjoin_max_partition_tree_depth(gwi.thd));
csep->djsForceRun(get_diskjoin_force_run(gwi.thd)); csep->djsForceRun(get_diskjoin_force_run(gwi.thd));
csep->maxPmJoinResultCount(get_max_pm_join_result_count(gwi.thd));
if (get_um_mem_limit(gwi.thd) == 0) if (get_um_mem_limit(gwi.thd) == 0)
csep->umMemLimit(numeric_limits<int64_t>::max()); csep->umMemLimit(numeric_limits<int64_t>::max());
else else

View File

@ -141,6 +141,10 @@ static MYSQL_THDVAR_ULONG(diskjoin_max_partition_tree_depth, PLUGIN_VAR_RQCMDARG
static MYSQL_THDVAR_BOOL(diskjoin_force_run, PLUGIN_VAR_RQCMDARG, "Force run for the disk join step.", NULL, static MYSQL_THDVAR_BOOL(diskjoin_force_run, PLUGIN_VAR_RQCMDARG, "Force run for the disk join step.", NULL,
NULL, 0); NULL, 0);
static MYSQL_THDVAR_ULONG(max_pm_join_result_count, PLUGIN_VAR_RQCMDARG,
"The maximum size of the join result for the single block on BPP.", NULL, NULL,
1048576, 1, ~0U, 1);
static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG,
"Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL, "Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL,
NULL, 0, 0, ~0U, 1); NULL, 0, 0, ~0U, 1);
@ -233,6 +237,7 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(diskjoin_bucketsize), MYSQL_SYSVAR(diskjoin_bucketsize),
MYSQL_SYSVAR(diskjoin_max_partition_tree_depth), MYSQL_SYSVAR(diskjoin_max_partition_tree_depth),
MYSQL_SYSVAR(diskjoin_force_run), MYSQL_SYSVAR(diskjoin_force_run),
MYSQL_SYSVAR(max_pm_join_result_count),
MYSQL_SYSVAR(um_mem_limit), MYSQL_SYSVAR(um_mem_limit),
MYSQL_SYSVAR(double_for_decimal_math), MYSQL_SYSVAR(double_for_decimal_math),
MYSQL_SYSVAR(decimal_overflow_check), MYSQL_SYSVAR(decimal_overflow_check),
@ -448,6 +453,15 @@ void set_diskjoin_force_run(THD* thd, bool value)
THDVAR(thd, diskjoin_force_run) = value; THDVAR(thd, diskjoin_force_run) = value;
} }
ulong get_max_pm_join_result_count(THD* thd)
{
return (thd == NULL) ? 0 : THDVAR(thd, max_pm_join_result_count);
}
void set_max_pm_join_result_count(THD* thd, ulong value)
{
THDVAR(thd, max_pm_join_result_count) = value;
}
ulong get_um_mem_limit(THD* thd) ulong get_um_mem_limit(THD* thd)
{ {
return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit); return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit);

View File

@ -114,6 +114,9 @@ void set_diskjoin_force_run(THD* thd, bool value);
ulong get_diskjoin_max_partition_tree_depth(THD* thd); ulong get_diskjoin_max_partition_tree_depth(THD* thd);
void set_diskjoin_max_partition_tree_depth(THD* thd, ulong value); void set_diskjoin_max_partition_tree_depth(THD* thd, ulong value);
ulong get_max_pm_join_result_count(THD* thd);
void set_max_pm_join_result_count(THD* thd, ulong value);
ulong get_um_mem_limit(THD* thd); ulong get_um_mem_limit(THD* thd);
void set_um_mem_limit(THD* thd, ulong value); void set_um_mem_limit(THD* thd, ulong value);

View File

@ -0,0 +1,36 @@
DROP DATABASE IF EXISTS mcol_5522;
CREATE DATABASE mcol_5522;
USE mcol_5522;
create table t1 (a int) engine=columnstore;
create table t2 (a int) engine=columnstore;
insert into t1 values (1), (2), (3), (4);
insert into t2 values (1), (2), (3), (4);
create table t3 (a varchar(200)) engine=columnstore;
create table t4 (a varchar(200)) engine=columnstore;
insert into t3 values ("one"), ("two"), ("three");
insert into t4 values ("one"), ("two"), ("three");
set session columnstore_max_pm_join_result_count=1;
select * from t1, t2 where t1.a = t2.a;
a a
1 1
2 2
3 3
4 4
select * from t3, t4 where t3.a = t4.a;
a a
one one
two two
three three
set session columnstore_max_pm_join_result_count=1048576;
select * from t1, t2 where t1.a = t2.a;
a a
1 1
2 2
3 3
4 4
select * from t3, t4 where t3.a = t4.a;
a a
one one
two two
three three
DROP DATABASE mcol_5522;

View File

@ -0,0 +1,29 @@
--source ../include/have_columnstore.inc
--disable_warnings
DROP DATABASE IF EXISTS mcol_5522;
--enable_warnings
CREATE DATABASE mcol_5522;
USE mcol_5522;
create table t1 (a int) engine=columnstore;
create table t2 (a int) engine=columnstore;
insert into t1 values (1), (2), (3), (4);
insert into t2 values (1), (2), (3), (4);
create table t3 (a varchar(200)) engine=columnstore;
create table t4 (a varchar(200)) engine=columnstore;
insert into t3 values ("one"), ("two"), ("three");
insert into t4 values ("one"), ("two"), ("three");
set session columnstore_max_pm_join_result_count=1;
select * from t1, t2 where t1.a = t2.a;
select * from t3, t4 where t3.a = t4.a;
set session columnstore_max_pm_join_result_count=1048576;
select * from t1, t2 where t1.a = t2.a;
select * from t3, t4 where t3.a = t4.a;
--disable_warnings
DROP DATABASE mcol_5522;
--enable_warnings

View File

@ -282,7 +282,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
if (ot == ROW_GROUP) if (ot == ROW_GROUP)
{ {
bs >> outputRG; bs >> outputRG;
// outputRG.setUseStringTable(true);
bs >> tmp8; bs >> tmp8;
if (tmp8) if (tmp8)
@ -305,11 +304,12 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
if (doJoin) if (doJoin)
{ {
pthread_mutex_lock(&objLock); pthread_mutex_lock(&objLock);
bs >> maxPmJoinResultCount;
maxPmJoinResultCount = std::max(maxPmJoinResultCount, (uint32_t)1);
if (ot == ROW_GROUP) if (ot == ROW_GROUP)
{ {
bs >> joinerCount; bs >> joinerCount;
// cout << "joinerCount = " << joinerCount << endl;
joinTypes.reset(new JoinType[joinerCount]); joinTypes.reset(new JoinType[joinerCount]);
tJoiners.reset(new std::shared_ptr<boost::shared_ptr<TJoiner>[]>[joinerCount]); tJoiners.reset(new std::shared_ptr<boost::shared_ptr<TJoiner>[]>[joinerCount]);
@ -349,8 +349,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
uint32_t tmp32; uint32_t tmp32;
bs >> tmp32; bs >> tmp32;
tJoinerSizes[i] = tmp32; tJoinerSizes[i] = tmp32;
// bs >> tJoinerSizes[i];
// cout << "joiner size = " << tJoinerSizes[i] << endl;
bs >> joinTypes[i]; bs >> joinTypes[i];
bs >> tmp8; bs >> tmp8;
typelessJoin[i] = (bool)tmp8; typelessJoin[i] = (bool)tmp8;
@ -369,7 +367,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
{ {
bs >> joinNullValues[i]; bs >> joinNullValues[i];
bs >> largeSideKeyColumns[i]; bs >> largeSideKeyColumns[i];
// cout << "large side key is " << largeSideKeyColumns[i] << endl;
for (uint j = 0; j < processorThreads; ++j) for (uint j = 0; j < processorThreads; ++j)
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher())); tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher()));
} }
@ -410,8 +407,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
if (getTupleJoinRowGroupData) if (getTupleJoinRowGroupData)
{ {
deserializeVector(bs, smallSideRGs); deserializeVector(bs, smallSideRGs);
// cout << "deserialized " << smallSideRGs.size() << " small-side
// rowgroups\n";
idbassert(smallSideRGs.size() == joinerCount); idbassert(smallSideRGs.size() == joinerCount);
smallSideRowLengths.reset(new uint32_t[joinerCount]); smallSideRowLengths.reset(new uint32_t[joinerCount]);
smallSideRowData.reset(new RGData[joinerCount]); smallSideRowData.reset(new RGData[joinerCount]);
@ -424,9 +419,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
smallSideRowLengths[i] = smallSideRGs[i].getRowSize(); smallSideRowLengths[i] = smallSideRGs[i].getRowSize();
; ;
smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]); smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]);
// smallSideRowData[i].reset(new uint8_t[
// smallSideRGs[i].getEmptySize() +
// (uint64_t) smallSideRowLengths[i] * tJoinerSizes[i]]);
smallSideRGs[i].setData(&smallSideRowData[i]); smallSideRGs[i].setData(&smallSideRowData[i]);
smallSideRGs[i].resetRowGroup(0); smallSideRGs[i].resetRowGroup(0);
ssrdPos[i] = smallSideRGs[i].getEmptySize(); ssrdPos[i] = smallSideRGs[i].getEmptySize();
@ -446,7 +438,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
bs >> largeSideRG; bs >> largeSideRG;
bs >> joinedRG; bs >> joinedRG;
// cout << "got the joined Rowgroup: " << joinedRG.toString() << "\n";
} }
} }
@ -457,13 +448,11 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
bs >> filterCount; bs >> filterCount;
filterSteps.resize(filterCount); filterSteps.resize(filterCount);
// cout << "deserializing " << filterCount << " filters\n";
hasScan = false; hasScan = false;
hasPassThru = false; hasPassThru = false;
for (i = 0; i < filterCount; ++i) for (i = 0; i < filterCount; ++i)
{ {
// cout << "deserializing step " << i << endl;
filterSteps[i] = SCommand(Command::makeCommand(bs, &type, filterSteps)); filterSteps[i] = SCommand(Command::makeCommand(bs, &type, filterSteps));
if (type == Command::COLUMN_COMMAND) if (type == Command::COLUMN_COMMAND)
@ -488,12 +477,10 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
} }
bs >> projectCount; bs >> projectCount;
// cout << "deserializing " << projectCount << " projected columns\n\n";
projectSteps.resize(projectCount); projectSteps.resize(projectCount);
for (i = 0; i < projectCount; ++i) for (i = 0; i < projectCount; ++i)
{ {
// cout << "deserializing step " << i << endl;
projectSteps[i] = SCommand(Command::makeCommand(bs, &type, projectSteps)); projectSteps[i] = SCommand(Command::makeCommand(bs, &type, projectSteps));
if (type == Command::PASS_THRU) if (type == Command::PASS_THRU)
@ -1165,21 +1152,20 @@ void BatchPrimitiveProcessor::initProcessor()
asyncLoaded.reset(new bool[projectCount + 2]); asyncLoaded.reset(new bool[projectCount + 2]);
} }
/* This version does a join on projected rows */ // This version does a join on projected rows
// In order to prevent super size result sets in the case of near cartesian joins on three or more joins, // In order to prevent super size result sets in the case of near cartesian joins on three or more joins,
// the startRid start at 0) is used to begin the rid loop and if we cut off processing early because of // the startRid start at 0) is used to begin the rid loop and if we cut off processing early because of
// the size of the result set, we return the next rid to start with. If we finish ridCount rids, return 0- // the size of the result set, we return the next rid to start with. If we finish ridCount rids, return 0-
uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup& largeSideRowGroup)
{ {
uint32_t newRowCount = 0, i, j; uint32_t newRowCount = 0, i, j;
vector<uint32_t> matches; vector<uint32_t> matches;
uint64_t largeKey; uint64_t largeKey;
uint64_t resultCount = 0; uint64_t resultCount = 0;
uint32_t newStartRid = startRid; uint32_t newStartRid = startRid;
outputRG.getRow(0, &oldRow); largeSideRowGroup.getRow(startRid, &oldRow);
outputRG.getRow(0, &newRow); outputRG.getRow(0, &newRow);
// cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl;
// ridCount gets modified based on the number of Rids actually processed during this call. // ridCount gets modified based on the number of Rids actually processed during this call.
// origRidCount is the number of rids for this thread after filter, which are the total // origRidCount is the number of rids for this thread after filter, which are the total
// number of rids to be processed from all calls to this function during this thread. // number of rids to be processed from all calls to this function during this thread.
@ -1196,7 +1182,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
* are NULL values to match against, but there is no filter, all rows can be eliminated. * are NULL values to match against, but there is no filter, all rows can be eliminated.
*/ */
// cout << "large side row: " << oldRow.toString() << endl;
for (j = 0; j < joinerCount; j++) for (j = 0; j < joinerCount; j++)
{ {
bool found; bool found;
@ -1211,7 +1196,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
if (LIKELY(!typelessJoin[j])) if (LIKELY(!typelessJoin[j]))
{ {
// cout << "not typeless join\n";
bool isNull; bool isNull;
uint32_t colIndex = largeSideKeyColumns[j]; uint32_t colIndex = largeSideKeyColumns[j];
@ -1235,16 +1219,11 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
((joinTypes[j] & ANTI) && !joinerIsEmpty && ((joinTypes[j] & ANTI) && !joinerIsEmpty &&
((isNull && (joinTypes[j] & MATCHNULLS)) || (found && !isNull)))) ((isNull && (joinTypes[j] & MATCHNULLS)) || (found && !isNull))))
{ {
// cout << " - not in the result set\n";
break; break;
} }
// else
// cout << " - in the result set\n";
} }
else else
{ {
// cout << " typeless join\n";
// the null values are not sent by UM in typeless case. null -> !found // the null values are not sent by UM in typeless case. null -> !found
TypelessData tlLargeKey(&oldRow); TypelessData tlLargeKey(&oldRow);
uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j], mSmallSideKeyColumnsPtr, uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j], mSmallSideKeyColumnsPtr,
@ -1254,9 +1233,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) || (joinTypes[j] & ANTI)) if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) || (joinTypes[j] & ANTI))
{ {
/* Separated the ANTI join logic for readability. // Separated the ANTI join logic for readability.
*
*/
if (joinTypes[j] & ANTI) if (joinTypes[j] & ANTI)
{ {
if (found) if (found)
@ -1321,9 +1298,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
else else
{ {
smallSideRGs[j].getRow(tSmallSideMatches[j][newRowCount][k], &smallRows[j]); smallSideRGs[j].getRow(tSmallSideMatches[j][newRowCount][k], &smallRows[j]);
// uint64_t rowOffset = ((uint64_t) tSmallSideMatches[j][newRowCount][k]) *
// smallRows[j].getSize() + smallSideRGs[j].getEmptySize();
// smallRows[j].setData(&smallSideRowData[j][rowOffset]);
} }
applyMapping(joinFEMappings[j], smallRows[j], &joinFERow); applyMapping(joinFEMappings[j], smallRows[j], &joinFERow);
@ -1396,44 +1370,28 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid)
wide128Values[newRowCount] = wide128Values[i]; wide128Values[newRowCount] = wide128Values[i];
relRids[newRowCount] = relRids[i]; relRids[newRowCount] = relRids[i];
copyRow(oldRow, &newRow); copyRow(oldRow, &newRow);
// cout << "joined row: " << newRow.toString() << endl;
} }
newRowCount++; newRowCount++;
newRow.nextRow(); newRow.nextRow();
} }
// else
// cout << "j != joinerCount\n";
} }
// If we've accumulated more than maxResultCount -- 1048576 (2^20)_ of resultCounts, cut off processing. // If we've accumulated more than `maxPmJoinResultCount` of `resultCounts`, cut off processing.
// The caller will restart to continue where we left off. // The caller will restart to continue where we left off.
if (resultCount >= maxResultCount) if (resultCount >= maxPmJoinResultCount)
{ {
// FIXME: Implement proper pipleline. (MCOL-5522). // New start rid is a next row for large side.
cerr << "BPP join match count exceeded the limit, match count: " << resultCount << endl; newStartRid = i + 1;
resultCount = 0; break;
} }
} }
if (resultCount < maxResultCount) if (resultCount < maxPmJoinResultCount)
newStartRid = 0; newStartRid = 0;
ridCount = newRowCount; ridCount = newRowCount;
outputRG.setRowCount(ridCount); outputRG.setRowCount(ridCount);
/* prints out the whole result set.
if (ridCount != 0) {
cout << "RG rowcount=" << outputRG.getRowCount() << " BPP ridcount=" << ridCount << endl;
for (i = 0; i < joinerCount; i++) {
for (j = 0; j < ridCount; j++) {
cout << "joiner " << i << " has " << tSmallSideMatches[i][j].size() << "
entries" << endl; cout << "row " << j << ":"; for (uint32_t k = 0; k < tSmallSideMatches[i][j].size();
k++) cout << " " << tSmallSideMatches[i][j][k]; cout << endl;
}
cout << endl;
}
}
*/
return newStartRid; return newStartRid;
} }
@ -1780,16 +1738,24 @@ void BatchPrimitiveProcessor::execute()
} }
} }
// Duplicate projected `RGData` to `large side` row group.
// We create a `large side` row group from `output` row group,
// to save an original data, because 'output` row group is used
// to store matched rows from small side.
RGData largeSideRGData = outputRG.duplicate();
RowGroup largeSideRowGroup = outputRG;
largeSideRowGroup.setData(&largeSideRGData);
do // while (startRid > 0) do // while (startRid > 0)
{ {
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- executeTupleJoin()"); stopwatch->start("-- executeTupleJoin()");
startRid = executeTupleJoin(startRid); startRid = executeTupleJoin(startRid, largeSideRowGroup);
stopwatch->stop("-- executeTupleJoin()"); stopwatch->stop("-- executeTupleJoin()");
#else #else
startRid = executeTupleJoin(startRid); startRid = executeTupleJoin(startRid, largeSideRowGroup);
// sStartRid = startRid;
#endif #endif
/* project the non-key columns */ /* project the non-key columns */
for (j = 0; j < projectCount; ++j) for (j = 0; j < projectCount; ++j)
{ {
@ -2865,7 +2831,6 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
for (i = 0; i < vssData.size(); i++) for (i = 0; i < vssData.size(); i++)
vssCache.insert(make_pair(lbidList[i], vssData[i])); vssCache.insert(make_pair(lbidList[i], vssData[i]));
// cout << "buildVSSCache inserted " << vssCache.size() << " elements" << endl;
} }
} // namespace primitiveprocessor } // namespace primitiveprocessor

View File

@ -336,7 +336,7 @@ class BatchPrimitiveProcessor
std::shared_ptr<std::shared_ptr<boost::shared_ptr<TJoiner>[]>[]> tJoiners; std::shared_ptr<std::shared_ptr<boost::shared_ptr<TJoiner>[]>[]> tJoiners;
typedef std::vector<uint32_t> MatchedData[LOGICAL_BLOCK_RIDS]; typedef std::vector<uint32_t> MatchedData[LOGICAL_BLOCK_RIDS];
std::shared_ptr<MatchedData[]> tSmallSideMatches; std::shared_ptr<MatchedData[]> tSmallSideMatches;
uint32_t executeTupleJoin(uint32_t startRid); uint32_t executeTupleJoin(uint32_t startRid, rowgroup::RowGroup& largeSideRowGroup);
bool getTupleJoinRowGroupData; bool getTupleJoinRowGroupData;
std::vector<rowgroup::RowGroup> smallSideRGs; std::vector<rowgroup::RowGroup> smallSideRGs;
rowgroup::RowGroup largeSideRG; rowgroup::RowGroup largeSideRG;
@ -438,7 +438,7 @@ class BatchPrimitiveProcessor
bool initiatedByEM_; bool initiatedByEM_;
uint32_t weight_; uint32_t weight_;
static const uint64_t maxResultCount = 1048576; // 2^20 uint32_t maxPmJoinResultCount = 1048576;
friend class Command; friend class Command;
friend class ColumnCommand; friend class ColumnCommand;
friend class DictStep; friend class DictStep;