1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5522 Properly process pm join result count.

This patch:
1. Properly processes situation when pm join result count is exceeded.
2. Adds session variable 'columnstore_max_pm_join_result_count` to control the limit.
This commit is contained in:
Denis Khalikov
2023-07-20 15:48:29 +03:00
committed by Leonid Fedorov
parent 9f15a0a9ac
commit 5f07828619
17 changed files with 195 additions and 102 deletions

View File

@ -85,6 +85,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location)
, fDJSSmallSideLimit(0)
, fDJSLargeSideLimit(0)
, fDJSPartitionSize(100 * 1024 * 1024)
, fMaxPmJoinResultCount(1048576)
, // 100MB mem usage for disk based join,
fUMMemLimit(numeric_limits<int64_t>::max())
, fIsDML(false)
@ -498,6 +499,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const
b << fDJSSmallSideLimit;
b << fDJSLargeSideLimit;
b << fDJSPartitionSize;
b << (uint32_t)fMaxPmJoinResultCount;
b << fUMMemLimit;
b << (uint8_t)fIsDML;
messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
@ -693,6 +695,7 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b)
b >> fDJSSmallSideLimit;
b >> fDJSLargeSideLimit;
b >> fDJSPartitionSize;
b >> (uint32_t&)fMaxPmJoinResultCount;
b >> fUMMemLimit;
b >> tmp8;
fIsDML = tmp8;

View File

@ -688,6 +688,15 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
return fDJSPartitionSize;
}
void maxPmJoinResultCount(uint32_t value)
{
fMaxPmJoinResultCount = value;
}
uint32_t maxPmJoinResultCount()
{
return fMaxPmJoinResultCount;
}
void umMemLimit(uint64_t l)
{
fUMMemLimit = l;
@ -917,14 +926,13 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan
boost::uuids::uuid fUuid;
/* Disk-based join vars */
uint64_t fDJSSmallSideLimit;
uint64_t fDJSLargeSideLimit;
uint64_t fDJSPartitionSize;
int64_t fUMMemLimit;
bool fIsDML;
long fTimeZone;
uint64_t fDJSSmallSideLimit = 0;
uint64_t fDJSLargeSideLimit = 0;
uint64_t fDJSPartitionSize = 100 * 1024 * 1024;
uint32_t fMaxPmJoinResultCount = 1048576;
int64_t fUMMemLimit = numeric_limits<int64_t>::max();
bool fIsDML = false;
long fTimeZone = 0;
std::vector<execplan::ParseTree*> fDynamicParseTreeVec;
};

View File

@ -1105,6 +1105,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
/* if HAS_JOINER, send the init params */
if (flags & HAS_JOINER)
{
bs << (uint32_t)maxPmJoinResultCount;
if (ot == ROW_GROUP)
{
idbassert(tJoiners.size() > 0);

View File

@ -252,6 +252,11 @@ class BatchPrimitiveProcessorJL
uuid = u;
}
void setMaxPmJoinResultCount(uint32_t count)
{
maxPmJoinResultCount = count;
}
private:
const size_t perColumnProjectWeight_ = 10;
const size_t perColumnFilteringWeight_ = 10;
@ -374,6 +379,7 @@ class BatchPrimitiveProcessorJL
unsigned fJoinerChunkSize;
uint32_t dbRoot;
bool hasSmallOuterJoin;
uint32_t maxPmJoinResultCount = 1048576;
uint32_t _priority;

View File

@ -211,6 +211,7 @@ struct JobInfo
, wfqLimitStart(0)
, wfqLimitCount(-1)
, timeZone(0)
, maxPmJoinResultCount(1048576)
{
}
ResourceManager* rm;
@ -366,6 +367,7 @@ struct JobInfo
uint64_t partitionSize;
bool isDML;
long timeZone;
uint32_t maxPmJoinResultCount;
// This is for tracking any dynamically allocated ParseTree objects
// in simpleScalarFilterToParseTree() for later deletion in

View File

@ -22,7 +22,7 @@
#include <stack>
#include <iterator>
#include <algorithm>
//#define NDEBUG
// #define NDEBUG
#include <cassert>
#include <vector>
#include <set>
@ -172,7 +172,7 @@ void projectSimpleColumn(const SimpleColumn* sc, JobStepVector& jsv, JobInfo& jo
// This is a double-step step
// if (jobInfo.trace)
// cout << "doProject Emit pGetSignature for SimpleColumn " << dictOid <<
//endl;
// endl;
pds = new pDictionaryStep(dictOid, tbl_oid, ct, jobInfo);
jobInfo.keyInfo->dictOidToColOid[dictOid] = oid;
@ -2066,6 +2066,7 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm,
jobInfo.smallSideLimit = csep->djsSmallSideLimit();
jobInfo.largeSideLimit = csep->djsLargeSideLimit();
jobInfo.partitionSize = csep->djsPartitionSize();
jobInfo.maxPmJoinResultCount = csep->maxPmJoinResultCount();
jobInfo.umMemLimit.reset(new int64_t);
*(jobInfo.umMemLimit) = csep->umMemLimit();
jobInfo.isDML = csep->isDML();

View File

@ -91,6 +91,7 @@ JobStep::JobStep(const JobInfo& j)
, fProgress(0)
, fStartTime(-1)
, fTimeZone(j.timeZone)
, fMaxPmJoinResultCount(j.maxPmJoinResultCount)
{
QueryTeleServerParms tsp;
string teleServerHost(Config::makeConfig()->getConfig("QueryTele", "Host"));

View File

@ -497,6 +497,7 @@ class JobStep
int64_t fStartTime;
int64_t fLastStepTeleTime;
long fTimeZone;
uint32_t fMaxPmJoinResultCount;
private:
static boost::mutex fLogMutex;

View File

@ -1203,6 +1203,11 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
return bRunFEonPM;
}
void setMaxPmJoinResultCount(uint32_t count)
{
maxPmJoinResultCount = count;
}
protected:
void sendError(uint16_t status);
@ -1344,6 +1349,8 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
boost::shared_ptr<RowGroupDL> deliveryDL;
uint32_t deliveryIt;
uint32_t maxPmJoinResultCount;
class JoinLocalData
{
public:

View File

@ -1458,9 +1458,13 @@ void TupleBPS::run()
fBPP->setThreadCount(fMaxNumProcessorThreads);
if (doJoin)
{
for (i = 0; i < smallSideCount; i++)
tjoiners[i]->setThreadCount(fMaxNumProcessorThreads);
fBPP->setMaxPmJoinResultCount(fMaxPmJoinResultCount);
}
if (fe1)
fBPP->setFEGroup1(fe1, fe1Input);

View File

@ -1881,7 +1881,8 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip)
}
}
if (get_fe_conn_info_ptr() == NULL) {
if (get_fe_conn_info_ptr() == NULL)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2796,7 +2797,8 @@ void setError(THD* thd, uint32_t errcode, string errmsg)
thd->raise_error_printf(errcode, errmsg.c_str());
// reset expressionID
if (get_fe_conn_info_ptr() == NULL) {
if (get_fe_conn_info_ptr() == NULL)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3608,7 +3610,8 @@ ReturnedColumn* buildBooleanConstantColumn(Item* item, gp_walk_info& gwi, bool&
ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport)
{
if (get_fe_conn_info_ptr() == NULL) {
if (get_fe_conn_info_ptr() == NULL)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3871,7 +3874,8 @@ ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool
ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& nonSupport, bool selectBetweenIn)
{
if (get_fe_conn_info_ptr() == NULL) {
if (get_fe_conn_info_ptr() == NULL)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3964,7 +3968,8 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non
nonSupport = true;
gwi.fatalParseError = true;
Message::Args args;
string info = funcName + " with argument count > " + std::to_string(std::numeric_limits<uint16_t>::max());
string info =
funcName + " with argument count > " + std::to_string(std::numeric_limits<uint16_t>::max());
args.add(info);
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORTED_FUNCTION, args);
return NULL;
@ -4492,7 +4497,8 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non
FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonSupport)
{
if (get_fe_conn_info_ptr() == NULL) {
if (get_fe_conn_info_ptr() == NULL)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -4920,7 +4926,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
vector<SRCP> orderCols;
ConstArgParam constArgParam;
if (get_fe_conn_info_ptr() == NULL) {
if (get_fe_conn_info_ptr() == NULL)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -5951,7 +5958,9 @@ void gp_walk(const Item* item, void* arg)
// bug 3137. If filter constant like 1=0, put it to ptWorkStack
// MariaDB bug 750. Breaks if compare is an argument to a function.
// if ((int32_t)gwip->rcWorkStack.size() <= (gwip->rcBookMarkStack.empty() ? 0
// if ((int32_t)gwip->rcWorkStack.size() <= (gwip->rcBookMarkStack.empty()
//?
// 0
//: gwip->rcBookMarkStack.top())
// && isPredicateFunction(ifp, gwip))
if (isPredicateFunction(ifp, gwip))
@ -6223,12 +6232,13 @@ void gp_walk(const Item* item, void* arg)
if (operand)
{
gwip->rcWorkStack.push(operand);
if (i == 0 && gwip->scsp == NULL) // first item is the WHEN LHS
if (i == 0 && gwip->scsp == NULL) // first item is the WHEN LHS
{
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(operand);
if (sc)
{
gwip->scsp.reset(sc->clone()); // We need to clone else sc gets double deleted. This code is rarely executed so the cost is acceptable.
gwip->scsp.reset(sc->clone()); // We need to clone else sc gets double deleted. This code is
// rarely executed so the cost is acceptable.
}
}
}
@ -6686,7 +6696,7 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep)
csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024);
csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024);
csep->maxPmJoinResultCount(get_max_pm_join_result_count(gwi.thd));
if (get_um_mem_limit(gwi.thd) == 0)
csep->umMemLimit(numeric_limits<int64_t>::max());
else
@ -7405,8 +7415,7 @@ void buildInToExistsFilter(gp_walk_info& gwi, SELECT_LEX& select_lex)
* error id as an int
***********************************************************/
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion,
bool isSelectHandlerTop, bool isSelectLexUnit,
const std::vector<COND*>& condStack)
bool isSelectHandlerTop, bool isSelectLexUnit, const std::vector<COND*>& condStack)
{
#ifdef DEBUG_WALK_COND
cerr << "getSelectPlan()" << endl;
@ -7434,8 +7443,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
CalpontSelectExecutionPlan::SelectList derivedTbList;
// @bug 1796. Remember table order on the FROM list.
gwi.clauseType = FROM;
if ((rc = processFrom(isUnion, select_lex, gwi, csep, isSelectHandlerTop,
isSelectLexUnit)))
if ((rc = processFrom(isUnion, select_lex, gwi, csep, isSelectHandlerTop, isSelectLexUnit)))
{
return rc;
}
@ -7900,7 +7908,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
gwi.returnedCols[i]->hasAggregate(true);
}
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
gwi.returnedCols[i]->resultType(
CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
if (unionedTypeRc != 0)
{
@ -9132,14 +9141,14 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
{
// MCOL-1052 The condition could be useless.
// MariaDB bug 624 - without the fix_fields call, delete with join may error with "No query step".
//#if MYSQL_VERSION_ID < 50172
// #if MYSQL_VERSION_ID < 50172
//@bug 3039. fix fields for constants
if (!icp->fixed())
{
icp->fix_fields(gwi.thd, (Item**)&icp);
}
//#endif
// #endif
gwi.fatalParseError = false;
#ifdef DEBUG_WALK_COND
cerr << "------------------ WHERE -----------------------" << endl;
@ -9733,7 +9742,8 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
gwi.returnedCols[i]->hasAggregate(true);
}
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
gwi.returnedCols[i]->resultType(
CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
if (unionedTypeRc != 0)
{

View File

@ -39,8 +39,8 @@ static MYSQL_THDVAR_ENUM(compression_type, PLUGIN_VAR_RQCMDARG,
"SNAPPY segment files are Snappy compressed (default);"
#ifdef HAVE_LZ4
"LZ4 segment files are LZ4 compressed;",
# else
,
#else
,
#endif
NULL, // check
NULL, // update
@ -135,6 +135,10 @@ static MYSQL_THDVAR_ULONG(diskjoin_bucketsize, PLUGIN_VAR_RQCMDARG,
"The maximum size in MB of each 'small side' table in memory.", NULL, NULL, 100, 1,
~0U, 1);
static MYSQL_THDVAR_ULONG(max_pm_join_result_count, PLUGIN_VAR_RQCMDARG,
"The maximum size of the join result for the single block on BPP.", NULL, NULL,
1048576, 1, ~0U, 1);
static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG,
"Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL,
NULL, 0, 0, ~0U, 1);
@ -191,21 +195,21 @@ static MYSQL_THDVAR_ULONGLONG(cache_flush_threshold, PLUGIN_VAR_RQCMDARG,
"Threshold on the number of rows in the cache to trigger a flush", NULL, NULL,
500000, 1, 1000000000, 1);
static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL,
static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL,
"https://localhost");
static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL,
static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL,
"0.4.0");
static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL,
"");
static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL, "");
static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL,
NULL, 8640, 100, 65356, 1);
static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL, NULL, 8640, 100, 65356, 1);
static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL,
NULL, "");
static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret",
NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, "");
st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(fe_conn_info_ptr),
@ -224,6 +228,7 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(diskjoin_smallsidelimit),
MYSQL_SYSVAR(diskjoin_largesidelimit),
MYSQL_SYSVAR(diskjoin_bucketsize),
MYSQL_SYSVAR(max_pm_join_result_count),
MYSQL_SYSVAR(um_mem_limit),
MYSQL_SYSVAR(double_for_decimal_math),
MYSQL_SYSVAR(decimal_overflow_check),
@ -256,8 +261,10 @@ void* get_fe_conn_info_ptr(THD* thd)
void set_fe_conn_info_ptr(void* ptr, THD* thd)
{
if (thd == NULL) thd = current_thd;
if (thd == NULL) return;
if (thd == NULL)
thd = current_thd;
if (thd == NULL)
return;
THDVAR(thd, fe_conn_info_ptr) = (uint64_t)(ptr);
}
@ -420,6 +427,15 @@ void set_diskjoin_bucketsize(THD* thd, ulong value)
THDVAR(thd, diskjoin_bucketsize) = value;
}
ulong get_max_pm_join_result_count(THD* thd)
{
return (thd == NULL) ? 0 : THDVAR(thd, max_pm_join_result_count);
}
void set_max_pm_join_result_count(THD* thd, ulong value)
{
THDVAR(thd, max_pm_join_result_count) = value;
}
ulong get_um_mem_limit(THD* thd)
{
return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit);

View File

@ -108,6 +108,9 @@ void set_diskjoin_largesidelimit(THD* thd, ulong value);
ulong get_diskjoin_bucketsize(THD* thd);
void set_diskjoin_bucketsize(THD* thd, ulong value);
ulong get_max_pm_join_result_count(THD* thd);
void set_max_pm_join_result_count(THD* thd, ulong value);
ulong get_um_mem_limit(THD* thd);
void set_um_mem_limit(THD* thd, ulong value);