diff --git a/dbcon/execplan/calpontselectexecutionplan.cpp b/dbcon/execplan/calpontselectexecutionplan.cpp index cf7034a85..3b2a952a3 100644 --- a/dbcon/execplan/calpontselectexecutionplan.cpp +++ b/dbcon/execplan/calpontselectexecutionplan.cpp @@ -85,6 +85,7 @@ CalpontSelectExecutionPlan::CalpontSelectExecutionPlan(const int location) , fDJSSmallSideLimit(0) , fDJSLargeSideLimit(0) , fDJSPartitionSize(100 * 1024 * 1024) + , fMaxPmJoinResultCount(1048576) , // 100MB mem usage for disk based join, fUMMemLimit(numeric_limits::max()) , fIsDML(false) @@ -498,6 +499,7 @@ void CalpontSelectExecutionPlan::serialize(messageqcpp::ByteStream& b) const b << fDJSSmallSideLimit; b << fDJSLargeSideLimit; b << fDJSPartitionSize; + b << (uint32_t)fMaxPmJoinResultCount; b << fUMMemLimit; b << (uint8_t)fIsDML; messageqcpp::ByteStream::octbyte timeZone = fTimeZone; @@ -693,6 +695,7 @@ void CalpontSelectExecutionPlan::unserialize(messageqcpp::ByteStream& b) b >> fDJSSmallSideLimit; b >> fDJSLargeSideLimit; b >> fDJSPartitionSize; + b >> (uint32_t&)fMaxPmJoinResultCount; b >> fUMMemLimit; b >> tmp8; fIsDML = tmp8; diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index 10093f8ce..aea13fd34 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -688,6 +688,15 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan return fDJSPartitionSize; } + void maxPmJoinResultCount(uint32_t value) + { + fMaxPmJoinResultCount = value; + } + uint32_t maxPmJoinResultCount() + { + return fMaxPmJoinResultCount; + } + void umMemLimit(uint64_t l) { fUMMemLimit = l; @@ -917,14 +926,13 @@ class CalpontSelectExecutionPlan : public CalpontExecutionPlan boost::uuids::uuid fUuid; /* Disk-based join vars */ - uint64_t fDJSSmallSideLimit; - uint64_t fDJSLargeSideLimit; - uint64_t fDJSPartitionSize; - int64_t fUMMemLimit; - bool fIsDML; - - long fTimeZone; - + uint64_t fDJSSmallSideLimit = 0; + uint64_t fDJSLargeSideLimit = 0; + uint64_t fDJSPartitionSize = 100 * 1024 * 1024; + uint32_t fMaxPmJoinResultCount = 1048576; + int64_t fUMMemLimit = numeric_limits::max(); + bool fIsDML = false; + long fTimeZone = 0; std::vector fDynamicParseTreeVec; }; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index b7fcaec02..d8f2943b6 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1105,6 +1105,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const /* if HAS_JOINER, send the init params */ if (flags & HAS_JOINER) { + bs << (uint32_t)maxPmJoinResultCount; if (ot == ROW_GROUP) { idbassert(tJoiners.size() > 0); diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 6fbd31f63..365ea4ad0 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -252,6 +252,11 @@ class BatchPrimitiveProcessorJL uuid = u; } + void setMaxPmJoinResultCount(uint32_t count) + { + maxPmJoinResultCount = count; + } + private: const size_t perColumnProjectWeight_ = 10; const size_t perColumnFilteringWeight_ = 10; @@ -374,6 +379,7 @@ class BatchPrimitiveProcessorJL unsigned fJoinerChunkSize; uint32_t dbRoot; bool hasSmallOuterJoin; + uint32_t maxPmJoinResultCount = 1048576; uint32_t _priority; diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index a2c7a4881..c7e9fc0f2 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -211,6 +211,7 @@ struct JobInfo , wfqLimitStart(0) , wfqLimitCount(-1) , timeZone(0) + , maxPmJoinResultCount(1048576) { } ResourceManager* rm; @@ -366,6 +367,7 @@ struct JobInfo uint64_t partitionSize; bool isDML; long timeZone; + uint32_t maxPmJoinResultCount; // This is for tracking any dynamically allocated ParseTree objects // in simpleScalarFilterToParseTree() for later deletion in diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 11308612e..3590c3082 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -22,7 +22,7 @@ #include #include #include -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -172,7 +172,7 @@ void projectSimpleColumn(const SimpleColumn* sc, JobStepVector& jsv, JobInfo& jo // This is a double-step step // if (jobInfo.trace) // cout << "doProject Emit pGetSignature for SimpleColumn " << dictOid << - //endl; + // endl; pds = new pDictionaryStep(dictOid, tbl_oid, ct, jobInfo); jobInfo.keyInfo->dictOidToColOid[dictOid] = oid; @@ -2066,6 +2066,7 @@ SJLP makeJobList_(CalpontExecutionPlan* cplan, ResourceManager* rm, jobInfo.smallSideLimit = csep->djsSmallSideLimit(); jobInfo.largeSideLimit = csep->djsLargeSideLimit(); jobInfo.partitionSize = csep->djsPartitionSize(); + jobInfo.maxPmJoinResultCount = csep->maxPmJoinResultCount(); jobInfo.umMemLimit.reset(new int64_t); *(jobInfo.umMemLimit) = csep->umMemLimit(); jobInfo.isDML = csep->isDML(); diff --git a/dbcon/joblist/jobstep.cpp b/dbcon/joblist/jobstep.cpp index 8f42dff10..8968c339d 100644 --- a/dbcon/joblist/jobstep.cpp +++ b/dbcon/joblist/jobstep.cpp @@ -91,6 +91,7 @@ JobStep::JobStep(const JobInfo& j) , fProgress(0) , fStartTime(-1) , fTimeZone(j.timeZone) + , fMaxPmJoinResultCount(j.maxPmJoinResultCount) { QueryTeleServerParms tsp; string teleServerHost(Config::makeConfig()->getConfig("QueryTele", "Host")); diff --git a/dbcon/joblist/jobstep.h b/dbcon/joblist/jobstep.h index 362950f4c..3796524b5 100644 --- a/dbcon/joblist/jobstep.h +++ b/dbcon/joblist/jobstep.h @@ -497,6 +497,7 @@ class JobStep int64_t fStartTime; int64_t fLastStepTeleTime; long fTimeZone; + uint32_t fMaxPmJoinResultCount; private: static boost::mutex fLogMutex; diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 74bc0428b..36652d3f7 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -1203,6 +1203,11 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep return bRunFEonPM; } + void setMaxPmJoinResultCount(uint32_t count) + { + maxPmJoinResultCount = count; + } + protected: void sendError(uint16_t status); @@ -1344,6 +1349,8 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep boost::shared_ptr deliveryDL; uint32_t deliveryIt; + uint32_t maxPmJoinResultCount; + class JoinLocalData { public: diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 8d87374ef..3df7fad73 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1458,9 +1458,13 @@ void TupleBPS::run() fBPP->setThreadCount(fMaxNumProcessorThreads); if (doJoin) + { for (i = 0; i < smallSideCount; i++) tjoiners[i]->setThreadCount(fMaxNumProcessorThreads); + fBPP->setMaxPmJoinResultCount(fMaxPmJoinResultCount); + } + if (fe1) fBPP->setFEGroup1(fe1, fe1Input); diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 78f0dedf3..531f0bfa7 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -1881,7 +1881,8 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) } } - if (get_fe_conn_info_ptr() == NULL) { + if (get_fe_conn_info_ptr() == NULL) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -2796,7 +2797,8 @@ void setError(THD* thd, uint32_t errcode, string errmsg) thd->raise_error_printf(errcode, errmsg.c_str()); // reset expressionID - if (get_fe_conn_info_ptr() == NULL) { + if (get_fe_conn_info_ptr() == NULL) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3608,7 +3610,8 @@ ReturnedColumn* buildBooleanConstantColumn(Item* item, gp_walk_info& gwi, bool& ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport) { - if (get_fe_conn_info_ptr() == NULL) { + if (get_fe_conn_info_ptr() == NULL) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3871,7 +3874,8 @@ ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& nonSupport, bool selectBetweenIn) { - if (get_fe_conn_info_ptr() == NULL) { + if (get_fe_conn_info_ptr() == NULL) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -3964,7 +3968,8 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non nonSupport = true; gwi.fatalParseError = true; Message::Args args; - string info = funcName + " with argument count > " + std::to_string(std::numeric_limits::max()); + string info = + funcName + " with argument count > " + std::to_string(std::numeric_limits::max()); args.add(info); gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORTED_FUNCTION, args); return NULL; @@ -4492,7 +4497,8 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonSupport) { - if (get_fe_conn_info_ptr() == NULL) { + if (get_fe_conn_info_ptr() == NULL) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -4920,7 +4926,8 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) vector orderCols; ConstArgParam constArgParam; - if (get_fe_conn_info_ptr() == NULL) { + if (get_fe_conn_info_ptr() == NULL) + { set_fe_conn_info_ptr((void*)new cal_connection_info()); thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr()); } @@ -5951,7 +5958,9 @@ void gp_walk(const Item* item, void* arg) // bug 3137. If filter constant like 1=0, put it to ptWorkStack // MariaDB bug 750. Breaks if compare is an argument to a function. - // if ((int32_t)gwip->rcWorkStack.size() <= (gwip->rcBookMarkStack.empty() ? 0 + // if ((int32_t)gwip->rcWorkStack.size() <= (gwip->rcBookMarkStack.empty() + //? + // 0 //: gwip->rcBookMarkStack.top()) // && isPredicateFunction(ifp, gwip)) if (isPredicateFunction(ifp, gwip)) @@ -6223,12 +6232,13 @@ void gp_walk(const Item* item, void* arg) if (operand) { gwip->rcWorkStack.push(operand); - if (i == 0 && gwip->scsp == NULL) // first item is the WHEN LHS + if (i == 0 && gwip->scsp == NULL) // first item is the WHEN LHS { SimpleColumn* sc = dynamic_cast(operand); if (sc) { - gwip->scsp.reset(sc->clone()); // We need to clone else sc gets double deleted. This code is rarely executed so the cost is acceptable. + gwip->scsp.reset(sc->clone()); // We need to clone else sc gets double deleted. This code is + // rarely executed so the cost is acceptable. } } } @@ -6686,7 +6696,7 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep) csep->djsSmallSideLimit(get_diskjoin_smallsidelimit(gwi.thd) * 1024ULL * 1024); csep->djsLargeSideLimit(get_diskjoin_largesidelimit(gwi.thd) * 1024ULL * 1024); csep->djsPartitionSize(get_diskjoin_bucketsize(gwi.thd) * 1024ULL * 1024); - + csep->maxPmJoinResultCount(get_max_pm_join_result_count(gwi.thd)); if (get_um_mem_limit(gwi.thd) == 0) csep->umMemLimit(numeric_limits::max()); else @@ -7405,8 +7415,7 @@ void buildInToExistsFilter(gp_walk_info& gwi, SELECT_LEX& select_lex) * error id as an int ***********************************************************/ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion, - bool isSelectHandlerTop, bool isSelectLexUnit, - const std::vector& condStack) + bool isSelectHandlerTop, bool isSelectLexUnit, const std::vector& condStack) { #ifdef DEBUG_WALK_COND cerr << "getSelectPlan()" << endl; @@ -7434,8 +7443,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i CalpontSelectExecutionPlan::SelectList derivedTbList; // @bug 1796. Remember table order on the FROM list. gwi.clauseType = FROM; - if ((rc = processFrom(isUnion, select_lex, gwi, csep, isSelectHandlerTop, - isSelectLexUnit))) + if ((rc = processFrom(isUnion, select_lex, gwi, csep, isSelectHandlerTop, isSelectLexUnit))) { return rc; } @@ -7900,7 +7908,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i gwi.returnedCols[i]->hasAggregate(true); } - gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc)); + gwi.returnedCols[i]->resultType( + CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc)); if (unionedTypeRc != 0) { @@ -9132,14 +9141,14 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro { // MCOL-1052 The condition could be useless. // MariaDB bug 624 - without the fix_fields call, delete with join may error with "No query step". - //#if MYSQL_VERSION_ID < 50172 + // #if MYSQL_VERSION_ID < 50172 //@bug 3039. fix fields for constants if (!icp->fixed()) { icp->fix_fields(gwi.thd, (Item**)&icp); } - //#endif + // #endif gwi.fatalParseError = false; #ifdef DEBUG_WALK_COND cerr << "------------------ WHERE -----------------------" << endl; @@ -9733,7 +9742,8 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro gwi.returnedCols[i]->hasAggregate(true); } - gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc)); + gwi.returnedCols[i]->resultType( + CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc)); if (unionedTypeRc != 0) { diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 9eecc6d73..77d6d2545 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -39,8 +39,8 @@ static MYSQL_THDVAR_ENUM(compression_type, PLUGIN_VAR_RQCMDARG, "SNAPPY segment files are Snappy compressed (default);" #ifdef HAVE_LZ4 "LZ4 segment files are LZ4 compressed;", -# else - , +#else + , #endif NULL, // check NULL, // update @@ -135,6 +135,10 @@ static MYSQL_THDVAR_ULONG(diskjoin_bucketsize, PLUGIN_VAR_RQCMDARG, "The maximum size in MB of each 'small side' table in memory.", NULL, NULL, 100, 1, ~0U, 1); +static MYSQL_THDVAR_ULONG(max_pm_join_result_count, PLUGIN_VAR_RQCMDARG, + "The maximum size of the join result for the single block on BPP.", NULL, NULL, + 1048576, 1, ~0U, 1); + static MYSQL_THDVAR_ULONG(um_mem_limit, PLUGIN_VAR_RQCMDARG, "Per user Memory limit(MB). Switch to disk-based JOIN when limit is reached", NULL, NULL, 0, 0, ~0U, 1); @@ -191,21 +195,21 @@ static MYSQL_THDVAR_ULONGLONG(cache_flush_threshold, PLUGIN_VAR_RQCMDARG, "Threshold on the number of rows in the cache to trigger a flush", NULL, NULL, 500000, 1, 1000000000, 1); -static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL, +static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL, "https://localhost"); -static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL, +static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL, "0.4.0"); -static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL, - ""); +static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL, ""); -static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL, - NULL, 8640, 100, 65356, 1); +static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL, NULL, 8640, 100, 65356, 1); -static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL, NULL, ""); -static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret", NULL, NULL, ""); -static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, ""); +static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL, + NULL, ""); +static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret", + NULL, NULL, ""); +static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT | PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, ""); st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(fe_conn_info_ptr), @@ -224,6 +228,7 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(diskjoin_smallsidelimit), MYSQL_SYSVAR(diskjoin_largesidelimit), MYSQL_SYSVAR(diskjoin_bucketsize), + MYSQL_SYSVAR(max_pm_join_result_count), MYSQL_SYSVAR(um_mem_limit), MYSQL_SYSVAR(double_for_decimal_math), MYSQL_SYSVAR(decimal_overflow_check), @@ -256,8 +261,10 @@ void* get_fe_conn_info_ptr(THD* thd) void set_fe_conn_info_ptr(void* ptr, THD* thd) { - if (thd == NULL) thd = current_thd; - if (thd == NULL) return; + if (thd == NULL) + thd = current_thd; + if (thd == NULL) + return; THDVAR(thd, fe_conn_info_ptr) = (uint64_t)(ptr); } @@ -420,6 +427,15 @@ void set_diskjoin_bucketsize(THD* thd, ulong value) THDVAR(thd, diskjoin_bucketsize) = value; } +ulong get_max_pm_join_result_count(THD* thd) +{ + return (thd == NULL) ? 0 : THDVAR(thd, max_pm_join_result_count); +} +void set_max_pm_join_result_count(THD* thd, ulong value) +{ + THDVAR(thd, max_pm_join_result_count) = value; +} + ulong get_um_mem_limit(THD* thd) { return (thd == NULL) ? 0 : THDVAR(thd, um_mem_limit); diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index ec116987d..2744a224f 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -108,6 +108,9 @@ void set_diskjoin_largesidelimit(THD* thd, ulong value); ulong get_diskjoin_bucketsize(THD* thd); void set_diskjoin_bucketsize(THD* thd, ulong value); +ulong get_max_pm_join_result_count(THD* thd); +void set_max_pm_join_result_count(THD* thd, ulong value); + ulong get_um_mem_limit(THD* thd); void set_um_mem_limit(THD* thd, ulong value); diff --git a/mysql-test/columnstore/bugfixes/mcol-5522.result b/mysql-test/columnstore/bugfixes/mcol-5522.result new file mode 100644 index 000000000..a9e4a6add --- /dev/null +++ b/mysql-test/columnstore/bugfixes/mcol-5522.result @@ -0,0 +1,36 @@ +DROP DATABASE IF EXISTS mcol_5522; +CREATE DATABASE mcol_5522; +USE mcol_5522; +create table t1 (a int) engine=columnstore; +create table t2 (a int) engine=columnstore; +insert into t1 values (1), (2), (3), (4); +insert into t2 values (1), (2), (3), (4); +create table t3 (a varchar(200)) engine=columnstore; +create table t4 (a varchar(200)) engine=columnstore; +insert into t3 values ("one"), ("two"), ("three"); +insert into t4 values ("one"), ("two"), ("three"); +set session columnstore_max_pm_join_result_count=1; +select * from t1, t2 where t1.a = t2.a; +a a +1 1 +2 2 +3 3 +4 4 +select * from t3, t4 where t3.a = t4.a; +a a +one one +two two +three three +set session columnstore_max_pm_join_result_count=1048576; +select * from t1, t2 where t1.a = t2.a; +a a +1 1 +2 2 +3 3 +4 4 +select * from t3, t4 where t3.a = t4.a; +a a +one one +two two +three three +DROP DATABASE mcol_5522; diff --git a/mysql-test/columnstore/bugfixes/mcol-5522.test b/mysql-test/columnstore/bugfixes/mcol-5522.test new file mode 100644 index 000000000..cde55077b --- /dev/null +++ b/mysql-test/columnstore/bugfixes/mcol-5522.test @@ -0,0 +1,29 @@ +--source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol_5522; +--enable_warnings +CREATE DATABASE mcol_5522; +USE mcol_5522; + +create table t1 (a int) engine=columnstore; +create table t2 (a int) engine=columnstore; +insert into t1 values (1), (2), (3), (4); +insert into t2 values (1), (2), (3), (4); + +create table t3 (a varchar(200)) engine=columnstore; +create table t4 (a varchar(200)) engine=columnstore; +insert into t3 values ("one"), ("two"), ("three"); +insert into t4 values ("one"), ("two"), ("three"); + +set session columnstore_max_pm_join_result_count=1; +select * from t1, t2 where t1.a = t2.a; +select * from t3, t4 where t3.a = t4.a; + +set session columnstore_max_pm_join_result_count=1048576; +select * from t1, t2 where t1.a = t2.a; +select * from t3, t4 where t3.a = t4.a; + +--disable_warnings +DROP DATABASE mcol_5522; +--enable_warnings diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index e2a03757b..62134e3c5 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -282,7 +282,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) if (ot == ROW_GROUP) { bs >> outputRG; - // outputRG.setUseStringTable(true); bs >> tmp8; if (tmp8) @@ -305,11 +304,12 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) if (doJoin) { pthread_mutex_lock(&objLock); + bs >> maxPmJoinResultCount; + maxPmJoinResultCount = std::max(maxPmJoinResultCount, (uint32_t)1); if (ot == ROW_GROUP) { bs >> joinerCount; - // cout << "joinerCount = " << joinerCount << endl; joinTypes.reset(new JoinType[joinerCount]); tJoiners.reset(new std::shared_ptr[]>[joinerCount]); @@ -349,8 +349,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) uint32_t tmp32; bs >> tmp32; tJoinerSizes[i] = tmp32; - // bs >> tJoinerSizes[i]; - // cout << "joiner size = " << tJoinerSizes[i] << endl; bs >> joinTypes[i]; bs >> tmp8; typelessJoin[i] = (bool)tmp8; @@ -369,7 +367,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) { bs >> joinNullValues[i]; bs >> largeSideKeyColumns[i]; - // cout << "large side key is " << largeSideKeyColumns[i] << endl; for (uint j = 0; j < processorThreads; ++j) tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher())); } @@ -410,8 +407,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) if (getTupleJoinRowGroupData) { deserializeVector(bs, smallSideRGs); - // cout << "deserialized " << smallSideRGs.size() << " small-side - // rowgroups\n"; idbassert(smallSideRGs.size() == joinerCount); smallSideRowLengths.reset(new uint32_t[joinerCount]); smallSideRowData.reset(new RGData[joinerCount]); @@ -424,9 +419,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) smallSideRowLengths[i] = smallSideRGs[i].getRowSize(); ; smallSideRowData[i] = RGData(smallSideRGs[i], tJoinerSizes[i]); - // smallSideRowData[i].reset(new uint8_t[ - // smallSideRGs[i].getEmptySize() + - // (uint64_t) smallSideRowLengths[i] * tJoinerSizes[i]]); smallSideRGs[i].setData(&smallSideRowData[i]); smallSideRGs[i].resetRowGroup(0); ssrdPos[i] = smallSideRGs[i].getEmptySize(); @@ -446,7 +438,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> largeSideRG; bs >> joinedRG; - // cout << "got the joined Rowgroup: " << joinedRG.toString() << "\n"; } } @@ -457,13 +448,11 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> filterCount; filterSteps.resize(filterCount); - // cout << "deserializing " << filterCount << " filters\n"; hasScan = false; hasPassThru = false; for (i = 0; i < filterCount; ++i) { - // cout << "deserializing step " << i << endl; filterSteps[i] = SCommand(Command::makeCommand(bs, &type, filterSteps)); if (type == Command::COLUMN_COMMAND) @@ -488,12 +477,10 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) } bs >> projectCount; - // cout << "deserializing " << projectCount << " projected columns\n\n"; projectSteps.resize(projectCount); for (i = 0; i < projectCount; ++i) { - // cout << "deserializing step " << i << endl; projectSteps[i] = SCommand(Command::makeCommand(bs, &type, projectSteps)); if (type == Command::PASS_THRU) @@ -1165,21 +1152,20 @@ void BatchPrimitiveProcessor::initProcessor() asyncLoaded.reset(new bool[projectCount + 2]); } -/* This version does a join on projected rows */ +// This version does a join on projected rows // In order to prevent super size result sets in the case of near cartesian joins on three or more joins, // the startRid start at 0) is used to begin the rid loop and if we cut off processing early because of // the size of the result set, we return the next rid to start with. If we finish ridCount rids, return 0- -uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) +uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup& largeSideRowGroup) { uint32_t newRowCount = 0, i, j; vector matches; uint64_t largeKey; uint64_t resultCount = 0; uint32_t newStartRid = startRid; - outputRG.getRow(0, &oldRow); + largeSideRowGroup.getRow(startRid, &oldRow); outputRG.getRow(0, &newRow); - // cout << "before join, RG has " << outputRG.getRowCount() << " BPP ridcount= " << ridCount << endl; // ridCount gets modified based on the number of Rids actually processed during this call. // origRidCount is the number of rids for this thread after filter, which are the total // number of rids to be processed from all calls to this function during this thread. @@ -1196,7 +1182,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) * are NULL values to match against, but there is no filter, all rows can be eliminated. */ - // cout << "large side row: " << oldRow.toString() << endl; for (j = 0; j < joinerCount; j++) { bool found; @@ -1211,7 +1196,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) if (LIKELY(!typelessJoin[j])) { - // cout << "not typeless join\n"; bool isNull; uint32_t colIndex = largeSideKeyColumns[j]; @@ -1235,16 +1219,11 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) ((joinTypes[j] & ANTI) && !joinerIsEmpty && ((isNull && (joinTypes[j] & MATCHNULLS)) || (found && !isNull)))) { - // cout << " - not in the result set\n"; break; } - - // else - // cout << " - in the result set\n"; } else { - // cout << " typeless join\n"; // the null values are not sent by UM in typeless case. null -> !found TypelessData tlLargeKey(&oldRow); uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j], mSmallSideKeyColumnsPtr, @@ -1254,9 +1233,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) || (joinTypes[j] & ANTI)) { - /* Separated the ANTI join logic for readability. - * - */ + // Separated the ANTI join logic for readability. if (joinTypes[j] & ANTI) { if (found) @@ -1321,9 +1298,6 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) else { smallSideRGs[j].getRow(tSmallSideMatches[j][newRowCount][k], &smallRows[j]); - // uint64_t rowOffset = ((uint64_t) tSmallSideMatches[j][newRowCount][k]) * - // smallRows[j].getSize() + smallSideRGs[j].getEmptySize(); - // smallRows[j].setData(&smallSideRowData[j][rowOffset]); } applyMapping(joinFEMappings[j], smallRows[j], &joinFERow); @@ -1396,44 +1370,28 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid) wide128Values[newRowCount] = wide128Values[i]; relRids[newRowCount] = relRids[i]; copyRow(oldRow, &newRow); - // cout << "joined row: " << newRow.toString() << endl; } newRowCount++; newRow.nextRow(); } - // else - // cout << "j != joinerCount\n"; } - // If we've accumulated more than maxResultCount -- 1048576 (2^20)_ of resultCounts, cut off processing. + // If we've accumulated more than `maxPmJoinResultCount` of `resultCounts`, cut off processing. // The caller will restart to continue where we left off. - if (resultCount >= maxResultCount) + if (resultCount >= maxPmJoinResultCount) { - // FIXME: Implement proper pipleline. (MCOL-5522). - cerr << "BPP join match count exceeded the limit, match count: " << resultCount << endl; - resultCount = 0; + // New start rid is a next row for large side. + newStartRid = i + 1; + break; } } - if (resultCount < maxResultCount) + if (resultCount < maxPmJoinResultCount) newStartRid = 0; ridCount = newRowCount; outputRG.setRowCount(ridCount); - /* prints out the whole result set. - if (ridCount != 0) { - cout << "RG rowcount=" << outputRG.getRowCount() << " BPP ridcount=" << ridCount << endl; - for (i = 0; i < joinerCount; i++) { - for (j = 0; j < ridCount; j++) { - cout << "joiner " << i << " has " << tSmallSideMatches[i][j].size() << " - entries" << endl; cout << "row " << j << ":"; for (uint32_t k = 0; k < tSmallSideMatches[i][j].size(); - k++) cout << " " << tSmallSideMatches[i][j][k]; cout << endl; - } - cout << endl; - } - } - */ return newStartRid; } @@ -1780,16 +1738,24 @@ void BatchPrimitiveProcessor::execute() } } + // Duplicate projected `RGData` to `large side` row group. + // We create a `large side` row group from `output` row group, + // to save an original data, because 'output` row group is used + // to store matched rows from small side. + RGData largeSideRGData = outputRG.duplicate(); + RowGroup largeSideRowGroup = outputRG; + largeSideRowGroup.setData(&largeSideRGData); + do // while (startRid > 0) { #ifdef PRIMPROC_STOPWATCH stopwatch->start("-- executeTupleJoin()"); - startRid = executeTupleJoin(startRid); + startRid = executeTupleJoin(startRid, largeSideRowGroup); stopwatch->stop("-- executeTupleJoin()"); #else - startRid = executeTupleJoin(startRid); -// sStartRid = startRid; + startRid = executeTupleJoin(startRid, largeSideRowGroup); #endif + /* project the non-key columns */ for (j = 0; j < projectCount; ++j) { @@ -2865,7 +2831,6 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount) for (i = 0; i < vssData.size(); i++) vssCache.insert(make_pair(lbidList[i], vssData[i])); - // cout << "buildVSSCache inserted " << vssCache.size() << " elements" << endl; } } // namespace primitiveprocessor diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index c2ffb08a2..730ee2a6b 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -336,7 +336,7 @@ class BatchPrimitiveProcessor std::shared_ptr[]>[]> tJoiners; typedef std::vector MatchedData[LOGICAL_BLOCK_RIDS]; std::shared_ptr tSmallSideMatches; - uint32_t executeTupleJoin(uint32_t startRid); + uint32_t executeTupleJoin(uint32_t startRid, rowgroup::RowGroup& largeSideRowGroup); bool getTupleJoinRowGroupData; std::vector smallSideRGs; rowgroup::RowGroup largeSideRG; @@ -438,7 +438,7 @@ class BatchPrimitiveProcessor bool initiatedByEM_; uint32_t weight_; - static const uint64_t maxResultCount = 1048576; // 2^20 + uint32_t maxPmJoinResultCount = 1048576; friend class Command; friend class ColumnCommand; friend class DictStep;