From 38fd96a663517350e7cf4e229e9d147a99a946ff Mon Sep 17 00:00:00 2001 From: Serguey Zefirov Date: Mon, 30 Sep 2024 14:50:35 +0300 Subject: [PATCH] fix(memory leaks): MCOL-5791 - get rid of memory leaks in plugin code There were numerous memory leaks in plugin's code and associated code. During typical run of MTR tests it leaked around 65 megabytes of objects. As a result they may severely affect long-lived connections. This patch fixes (almost) all leaks found in the plugin. The exceptions are two leaks associated with SHOW CREATE TABLE columnstore_table and getting information of columns of columnstore-handled table. These should be fixed on the server side and work is on the way. --- dbcon/ddlpackage/ddl.l | 8 +- dbcon/ddlpackage/ddl.y | 21 +- dbcon/dmlpackage/dml.l | 4 +- dbcon/mysql/ha_exists_sub.cpp | 2 +- dbcon/mysql/ha_from_sub.cpp | 2 +- dbcon/mysql/ha_in_sub.cpp | 2 +- dbcon/mysql/ha_mcs_ddl.cpp | 8 +- dbcon/mysql/ha_mcs_execplan.cpp | 231 +++++++++++++++++----- dbcon/mysql/ha_mcs_impl.cpp | 97 +++++---- dbcon/mysql/ha_mcs_impl_if.h | 35 +++- dbcon/mysql/ha_pseudocolumn.cpp | 1 + dbcon/mysql/ha_scalar_sub.cpp | 4 +- dbcon/mysql/ha_select_sub.cpp | 2 +- dbcon/mysql/ha_subquery.h | 19 ++ dbcon/mysql/ha_view.cpp | 12 +- dbcon/mysql/ha_window_function.cpp | 8 +- dbcon/mysql/is_columnstore_extents.cpp | 20 +- dbcon/mysql/sm.cpp | 6 +- dbcon/mysql/sm.h | 10 +- tests/CMakeLists.txt | 3 +- tests/scripts/fullmtr.sh | 16 +- versioning/BRM/brmshmimpl.cpp | 1 + writeengine/bulk/we_bulkload.cpp | 40 ++-- writeengine/bulk/we_bulkload.h | 4 +- writeengine/bulk/we_bulkloadbuffer.cpp | 32 +-- writeengine/bulk/we_extentstripealloc.cpp | 4 +- writeengine/bulk/we_extentstripealloc.h | 39 ++-- writeengine/bulk/we_tableinfo.cpp | 2 +- writeengine/bulk/we_workers.cpp | 60 +++--- writeengine/shared/we_brm.cpp | 5 + writeengine/shared/we_brm.h | 3 + 31 files changed, 472 insertions(+), 229 deletions(-) diff --git a/dbcon/ddlpackage/ddl.l b/dbcon/ddlpackage/ddl.l index c0aec2a94..5404542f0 100644 --- a/dbcon/ddlpackage/ddl.l +++ b/dbcon/ddlpackage/ddl.l @@ -34,7 +34,7 @@ int ddldebug = 0; int lineno = 1; void ddlerror(struct pass_to_bison* x, char const *s); -static char* scanner_copy(char *str, yyscan_t yyscanner, copy_action_t action = NOOP ); +static char* scanner_copy(const char *str, yyscan_t yyscanner, copy_action_t action = NOOP ); %} @@ -115,9 +115,9 @@ CONSTRAINT {return CONSTRAINT;} CONSTRAINTS {return CONSTRAINTS;} CREATE {return CREATE;} CURRENT_USER {return CURRENT_USER;} -DATE {ddlget_lval(yyscanner)->str=strdup("date"); return DATE;} +DATE {ddlget_lval(yyscanner)->str = scanner_copy("date", yyscanner); return DATE;} DATETIME {return DATETIME;} -TIME {ddlget_lval(yyscanner)->str=strdup("time"); return TIME;} +TIME {ddlget_lval(yyscanner)->str = scanner_copy("time", yyscanner); return TIME;} TIMESTAMP {return TIMESTAMP;} DECIMAL {return DECIMAL;} DEC {return DECIMAL;} @@ -276,7 +276,7 @@ void scanner_finish(yyscan_t yyscanner) pScanData->valbuf.clear(); } -char* scanner_copy (char *str, yyscan_t yyscanner, copy_action_t action) +char* scanner_copy (const char *str, yyscan_t yyscanner, copy_action_t action) { char* result; char* nv = strdup(str); diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index 83f4913f9..1e42314b5 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -105,7 +105,6 @@ void fix_column_length_and_charset(SchemaObject* elem, const CHARSET_INFO* def_c column->fType->fLength = 16777215; } } - %} %expect 17 @@ -255,6 +254,24 @@ ZEROFILL %type opt_quoted_literal %type opt_column_charset %type opt_column_collate + +// for pointers to vectors of pointers: +%destructor { if ($$) { for (auto p : *($$)) { delete p; } }; delete $$; } table_element_list + +// for objects allocated during parse:. +%destructor { delete $$; } qualified_name ident table_element column_def exact_numeric_type +%destructor { delete $$; } table_options opt_table_options table_name +%destructor { delete $$; } column_name_list data_type column_constraint +%destructor { delete $$; } column_constraint_def column_qualifier_list +%destructor { delete $$; } opt_referential_triggered_action referential_triggered_action +%destructor { delete $$; } table_option character_string_type binary_string_type blob_type +%destructor { delete $$; } text_type numeric_type table_constraint table_constraint_def + +// NOTE: if you have a leak in this code and do not know which one leaks +// add %destructor { printf("pop yykind %d\n"; fflush(stdout); } <*> +// this will print tags in residual stack after syntax error and you'll see +// what is not delete'd. + %% stmtblock: stmtmulti { x->fParseTree = $1; } ; @@ -685,7 +702,7 @@ qualified_name: if (x->fDBSchema.size()) $$ = new QualifiedName((char*)x->fDBSchema.c_str(), $1); else - $$ = new QualifiedName($1); + $$ = new QualifiedName($1); } | ident '.' ident { diff --git a/dbcon/dmlpackage/dml.l b/dbcon/dmlpackage/dml.l index fca2d3f25..babff264e 100644 --- a/dbcon/dmlpackage/dml.l +++ b/dbcon/dmlpackage/dml.l @@ -42,7 +42,7 @@ namespace dmlpackage { int lineno = 1; -static char* scanner_copy (char *str, yyscan_t yyscanner); +static char* scanner_copy (const char *str, yyscan_t yyscanner); /* macro to save the text and return a token */ @@ -256,7 +256,7 @@ void scanner_finish(yyscan_t yyscanner) pScanData->valbuf.clear(); } -char* scanner_copy (char *str, yyscan_t yyscanner) +char* scanner_copy (const char *str, yyscan_t yyscanner) { char* nv = strdup(str); if(nv) diff --git a/dbcon/mysql/ha_exists_sub.cpp b/dbcon/mysql/ha_exists_sub.cpp index a97abfa44..181d1c83c 100644 --- a/dbcon/mysql/ha_exists_sub.cpp +++ b/dbcon/mysql/ha_exists_sub.cpp @@ -96,7 +96,7 @@ execplan::ParseTree* ExistsSub::transform() csep->subType(CalpontSelectExecutionPlan::EXISTS_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; diff --git a/dbcon/mysql/ha_from_sub.cpp b/dbcon/mysql/ha_from_sub.cpp index 9013c8fe6..177887323 100644 --- a/dbcon/mysql/ha_from_sub.cpp +++ b/dbcon/mysql/ha_from_sub.cpp @@ -424,7 +424,7 @@ SCSEP FromSubQuery::transform() csep->subType(CalpontSelectExecutionPlan::FROM_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; gwi.viewName = fGwip.viewName; diff --git a/dbcon/mysql/ha_in_sub.cpp b/dbcon/mysql/ha_in_sub.cpp index 989e06ad5..8b2772efd 100644 --- a/dbcon/mysql/ha_in_sub.cpp +++ b/dbcon/mysql/ha_in_sub.cpp @@ -151,7 +151,7 @@ execplan::ParseTree* InSub::transform() csep->subType(CalpontSelectExecutionPlan::IN_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; diff --git a/dbcon/mysql/ha_mcs_ddl.cpp b/dbcon/mysql/ha_mcs_ddl.cpp index 9695d21b2..fac2d176e 100644 --- a/dbcon/mysql/ha_mcs_ddl.cpp +++ b/dbcon/mysql/ha_mcs_ddl.cpp @@ -344,7 +344,7 @@ bool anyRowInTable(string& schema, string& tableName, int sessionID) rowgroup::RGData rgData; ByteStream::quadbyte qb = 4; msg << qb; - rowgroup::RowGroup* rowGroup = 0; + std::shared_ptr rowGroup = 0; bool anyRow = false; exemgrClient->write(msg); @@ -397,7 +397,7 @@ bool anyRowInTable(string& schema, string& tableName, int sessionID) if (!rowGroup) { // This is mete data - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(msg); qb = 100; msg.restart(); @@ -515,7 +515,7 @@ bool anyTimestampColumn(string& schema, string& tableName, int sessionID) rowgroup::RGData rgData; ByteStream::quadbyte qb = 4; msg << qb; - rowgroup::RowGroup* rowGroup = 0; + std::shared_ptr rowGroup = 0; bool anyRow = false; exemgrClient->write(msg); @@ -568,7 +568,7 @@ bool anyTimestampColumn(string& schema, string& tableName, int sessionID) if (!rowGroup) { // This is mete data - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(msg); qb = 100; msg.restart(); diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index e8b7b9a83..85a2b2968 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -293,13 +293,66 @@ string escapeBackTick(const char* str) return ret; } -void clearStacks(gp_walk_info& gwi) +cal_impl_if::gp_walk_info::~gp_walk_info() +{ + while (!rcWorkStack.empty()) + { + delete rcWorkStack.top(); + rcWorkStack.pop(); + } + + while (!ptWorkStack.empty()) + { + delete ptWorkStack.top(); + ptWorkStack.pop(); + } + for (uint32_t i=0;i& join_list, ParseTree* pt = new ParseTree(onFilter); outerJoinStack.push(pt); } + } else // inner join { @@ -1545,7 +1599,7 @@ ParseTree* buildRowPredicate(gp_walk_info* gwip, RowColumn* lhs, RowColumn* rhs, ParseTree* pt = new ParseTree(lo); sop->setOpType(lhs->columnVec()[0]->resultType(), rhs->columnVec()[0]->resultType()); - SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[0].get(), rhs->columnVec()[0].get()); + SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[0]->clone(), rhs->columnVec()[0]->clone()); sf->timeZone(gwip->timeZone); pt->left(new ParseTree(sf)); @@ -1553,7 +1607,7 @@ ParseTree* buildRowPredicate(gp_walk_info* gwip, RowColumn* lhs, RowColumn* rhs, { sop.reset(po->clone()); sop->setOpType(lhs->columnVec()[i]->resultType(), rhs->columnVec()[i]->resultType()); - SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[i].get(), rhs->columnVec()[i].get()); + SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[i]->clone(), rhs->columnVec()[i]->clone()); sf->timeZone(gwip->timeZone); pt->right(new ParseTree(sf)); @@ -1570,6 +1624,12 @@ ParseTree* buildRowPredicate(gp_walk_info* gwip, RowColumn* lhs, RowColumn* rhs, bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, Item_func* ifp) { + // rhs and lhs are being dismembered here and then get thrown away, leaking. + // So we create two scoped pointers to get them delete'd automatically at + // return. + // Also look below into heldoutVals vector - it contains values produced from + // ifp's arguments. + const std::unique_ptr rhsp(rhs), lhsp(lhs); if (ifp->functype() == Item_func::EQ_FUNC || ifp->functype() == Item_func::NE_FUNC) { // (c1,c2,..) = (v1,v2,...) transform to: c1=v1 and c2=v2 and ... @@ -1601,6 +1661,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It // two entries have been popped from the stack already: lhs and rhs stack tmpStack; vector valVec; + vector heldOutVals; // these vals are not rhs/lhs and need to be freed tmpStack.push(rhs); tmpStack.push(lhs); assert(gwip->rcWorkStack.size() >= ifp->argument_count() - 2); @@ -1608,6 +1669,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It for (uint32_t i = 2; i < ifp->argument_count(); i++) { tmpStack.push(gwip->rcWorkStack.top()); + heldOutVals.push_back(SRCP(gwip->rcWorkStack.top())); if (!gwip->rcWorkStack.empty()) gwip->rcWorkStack.pop(); @@ -1627,7 +1689,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It vals = dynamic_cast(tmpStack.top()); valVec.push_back(vals); tmpStack.pop(); - pt1->right(buildRowPredicate(gwip, columns->clone(), vals, predicateOp)); + pt1->right(buildRowPredicate(gwip, columns, vals, predicateOp)); pt = pt1; } @@ -1644,7 +1706,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It for (uint32_t i = 0; i < columns->columnVec().size(); i++) { - ConstantFilter* cf = new ConstantFilter(); + std::unique_ptr cf(new ConstantFilter()); sop.reset(lo->clone()); cf->op(sop); @@ -1652,7 +1714,9 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It // no optimization for non-simple column because CP won't apply if (!sc) + { continue; + } ssc.reset(sc->clone()); cf->col(ssc); @@ -1674,9 +1738,11 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It } if (j < valVec.size()) + { continue; + } - tmpPtStack.push(new ParseTree(cf)); + tmpPtStack.push(new ParseTree(cf.release())); } // "and" all the filters together @@ -2027,7 +2093,10 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) } if (!gwip->rcWorkStack.empty()) + { + delete gwip->rcWorkStack.top(); gwip->rcWorkStack.pop(); // pop gwip->scsp + } if (cf->filterList().size() < inp->argument_count() - 1) { @@ -2839,7 +2908,6 @@ void setError(THD* thd, uint32_t errcode, string errmsg) void setError(THD* thd, uint32_t errcode, string errmsg, gp_walk_info& gwi) { setError(thd, errcode, errmsg); - clearStacks(gwi); } int setErrorAndReturn(gp_walk_info& gwi) @@ -4114,6 +4182,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (!sptp) { nonSupport = true; + delete fc; return NULL; } @@ -4128,6 +4197,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non nonSupport = true; gwi.fatalParseError = true; gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_SUB_EXPRESSION); + delete fc; return NULL; } @@ -4159,6 +4229,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (!rc || nonSupport) { nonSupport = true; + delete fc; return NULL; } @@ -4187,6 +4258,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non else { nonSupport = true; + delete fc; return NULL; } } @@ -4583,6 +4655,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS gwi.inCaseStmt = false; if (!gwi.ptWorkStack.empty() && *gwi.ptWorkStack.top() == *sptp.get()) { + delete gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); } } @@ -4603,6 +4676,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS // We need to pop whichever stack is holding it, if any. if ((!gwi.rcWorkStack.empty()) && *gwi.rcWorkStack.top() == parm) { + delete gwi.rcWorkStack.top(); gwi.rcWorkStack.pop(); } else if (!gwi.ptWorkStack.empty()) @@ -4610,7 +4684,10 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS ReturnedColumn* ptrc = dynamic_cast(gwi.ptWorkStack.top()->data()); if (ptrc && *ptrc == *parm) + { + delete gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); + } } } else @@ -4620,6 +4697,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS // We need to pop whichever stack is holding it, if any. if ((!gwi.ptWorkStack.empty()) && *gwi.ptWorkStack.top()->data() == sptp->data()) { + delete gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); } else if (!gwi.rcWorkStack.empty()) @@ -4630,6 +4708,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS if (ptrc && *ptrc == *gwi.rcWorkStack.top()) { + delete gwi.rcWorkStack.top(); gwi.rcWorkStack.pop(); } } @@ -5301,6 +5380,18 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) ac->constCol(SRCP(rc)); break; } + // the "rc" can be in gwi.no_parm_func_list. erase it from that list and + // then delete it. + // kludge, I know. + uint32_t i; + + for (i = 0; gwi.no_parm_func_list[i] != rc && i < gwi.no_parm_func_list.size(); i++) { } + + if (i < gwi.no_parm_func_list.size()) + { + gwi.no_parm_func_list.erase(gwi.no_parm_func_list.begin() + i); + delete rc; + } } } @@ -5996,7 +6087,10 @@ void gp_walk(const Item* item, void* arg) { // @bug 4215. remove the argument in rcWorkStack. if (!gwip->rcWorkStack.empty()) + { + delete gwip->rcWorkStack.top(); gwip->rcWorkStack.pop(); + } break; } @@ -6031,6 +6125,7 @@ void gp_walk(const Item* item, void* arg) for (uint32_t i = 0; i < ifp->argument_count() && !gwip->rcWorkStack.empty(); i++) { + delete gwip->rcWorkStack.top(); gwip->rcWorkStack.pop(); } @@ -6207,6 +6302,7 @@ void gp_walk(const Item* item, void* arg) } else { + delete rhs; gwip->ptWorkStack.push(lhs); continue; } @@ -6740,21 +6836,21 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& if (table_ptr->derived) { SELECT_LEX* select_cursor = table_ptr->derived->first_select(); - FromSubQuery fromSub(gwi, select_cursor); + FromSubQuery* fromSub = new FromSubQuery(gwi, select_cursor); string alias(table_ptr->alias.str); if (lower_case_table_names) { boost::algorithm::to_lower(alias); } - fromSub.alias(alias); + fromSub->alias(alias); CalpontSystemCatalog::TableAliasName tn = make_aliasview("", "", alias, viewName); // @bug 3852. check return execplan - SCSEP plan = fromSub.transform(); + SCSEP plan = fromSub->transform(); if (!plan) { - setError(gwi.thd, ER_INTERNAL_ERROR, fromSub.gwip().parseErrorText, gwi); + setError(gwi.thd, ER_INTERNAL_ERROR, fromSub->gwip().parseErrorText, gwi); CalpontSystemCatalog::removeCalpontSystemCatalog(gwi.sessionid); return ER_INTERNAL_ERROR; } @@ -6878,7 +6974,7 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& plan->data(csep->data()); // gwi for the union unit - gp_walk_info union_gwi(gwi.timeZone); + gp_walk_info union_gwi(gwi.timeZone, gwi.subQueriesChain); union_gwi.thd = gwi.thd; uint32_t err = 0; @@ -7081,14 +7177,28 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s std::stack outerJoinStack; if ((failed = buildJoin(gwi, select_lex.top_join_list, outerJoinStack))) + { + while (!outerJoinStack.empty()) + { + delete outerJoinStack.top(); + outerJoinStack.pop(); + } return failed; + } if (gwi.subQuery) { for (uint i = 0; i < gwi.viewList.size(); i++) { if ((failed = gwi.viewList[i]->processJoin(gwi, outerJoinStack))) + { + while (!outerJoinStack.empty()) + { + delete outerJoinStack.top(); + outerJoinStack.pop(); + } return failed; + } } } @@ -7103,7 +7213,7 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s // is pushed to rcWorkStack. if (gwi.ptWorkStack.empty() && !gwi.rcWorkStack.empty()) { - filters = new ParseTree(gwi.rcWorkStack.top()); + gwi.ptWorkStack.push(new ParseTree(gwi.rcWorkStack.top())); gwi.rcWorkStack.pop(); } @@ -7156,6 +7266,7 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s "columnstore_max_allowed_in_values " "threshold.", gwi); + delete filters; return ER_CHECK_NOT_IMPLEMENTED; } @@ -7180,6 +7291,26 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s csep->filters(filters); } + if (!gwi.rcWorkStack.empty()) + { + while(!gwi.rcWorkStack.empty()) + { + ReturnedColumn* t = gwi.rcWorkStack.top(); + delete t; + gwi.rcWorkStack.pop(); + } + } + if (!gwi.ptWorkStack.empty()) + { + while(!gwi.ptWorkStack.empty()) + { + ParseTree* t = gwi.ptWorkStack.top(); + delete t; + gwi.ptWorkStack.pop(); + } + } + + return 0; } @@ -7515,7 +7646,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i vector funcFieldVec; // empty rcWorkStack and ptWorkStack. They should all be empty by now. - clearStacks(gwi); + clearStacks(gwi, false, true); // indicate the starting pos of scalar returned column, because some join column // has been inserted to the returned column list. @@ -7852,6 +7983,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i gwi.parseErrorText = "Unsupported Item in SELECT subquery."; setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi); + return ER_CHECK_NOT_IMPLEMENTED; } @@ -7963,8 +8095,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // Having clause handling gwi.clauseType = HAVING; - clearStacks(gwi); - ParseTree* havingFilter = 0; + clearStacks(gwi, false, true); + std::unique_ptr havingFilter; // clear fatalParseError that may be left from post process functions gwi.fatalParseError = false; gwi.parseErrorText = ""; @@ -7990,20 +8122,20 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // @bug 4215. some function filter will be in the rcWorkStack. if (gwi.ptWorkStack.empty() && !gwi.rcWorkStack.empty()) { - havingFilter = new ParseTree(gwi.rcWorkStack.top()); + gwi.ptWorkStack.push(new ParseTree(gwi.rcWorkStack.top())); gwi.rcWorkStack.pop(); } while (!gwi.ptWorkStack.empty()) { - havingFilter = gwi.ptWorkStack.top(); + havingFilter.reset(gwi.ptWorkStack.top()); gwi.ptWorkStack.pop(); if (gwi.ptWorkStack.empty()) break; ptp = new ParseTree(new LogicOperator("and")); - ptp->left(havingFilter); + ptp->left(havingFilter.release()); rhs = gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); ptp->right(rhs); @@ -8034,9 +8166,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i if (havingFilter) { ParseTree* ptp = new ParseTree(new LogicOperator("and")); - ptp->left(havingFilter); + ptp->left(havingFilter.release()); ptp->right(inToExistsFilter); - havingFilter = ptp; + havingFilter.reset(ptp); } else { @@ -8266,6 +8398,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { if (strcasecmp(sc->alias().c_str(), gwi.returnedCols[j]->alias().c_str()) == 0) { + delete rc; rc = gwi.returnedCols[j].get()->clone(); rc->orderPos(j); break; @@ -8278,6 +8411,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { if (ifp->name.length && string(ifp->name.str) == gwi.returnedCols[j].get()->alias()) { + delete rc; rc = gwi.returnedCols[j].get()->clone(); rc->orderPos(j); break; @@ -8629,21 +8763,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { } } - else if (ord_item->type() == Item::FUNC_ITEM) - { - // @bug5636. check if this order by column is on the select list - ReturnedColumn* rc = buildFunctionColumn((Item_func*)(ord_item), gwi, gwi.fatalParseError); - - for (uint32_t i = 0; i < gwi.returnedCols.size(); i++) - { - if (rc && rc->operator==(gwi.returnedCols[i].get())) - { - ostringstream oss; - oss << i + 1; - break; - } - } - } } } @@ -8746,20 +8865,28 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i csep->orderByCols(gwi.orderByCols); csep->returnedCols(gwi.returnedCols); csep->columnMap(gwi.columnMap); - csep->having(havingFilter); + csep->having(havingFilter.release()); csep->derivedTableList(gwi.derivedTbList); csep->selectSubList(selectSubList); csep->subSelectList(gwi.subselectList); - clearStacks(gwi); return 0; } int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone) { - gp_walk_info* gwi = ti.condInfo; - if (!gwi) - gwi = new gp_walk_info(timeZone); + SubQueryChainHolder chainHolder; + bool allocated = false; + gp_walk_info* gwi; + if (ti.condInfo) + { + gwi = &ti.condInfo->gwi; + } + else + { + allocated = true; + gwi = new gp_walk_info(timeZone, &chainHolder.chain); + } gwi->thd = thd; LEX* lex = thd->lex; @@ -8810,7 +8937,7 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone) // get filter if (ti.condInfo) { - gp_walk_info* gwi = ti.condInfo; + gp_walk_info* gwi = &ti.condInfo->gwi; ParseTree* filters = 0; ParseTree* ptp = 0; ParseTree* rhs = 0; @@ -8856,6 +8983,10 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone) // @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering csep->stringScanThreshold(get_string_scan_threshold(gwi->thd)); + if (allocated) + { + delete gwi; + } return 0; } @@ -8865,7 +8996,8 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi) const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQuery* chain = nullptr; + gp_walk_info gwi(timeZoneOffset, &chain); gwi.thd = thd; gwi.isGroupByHandler = true; int status = getGroupPlan(gwi, *select_lex, csep, gi); @@ -9003,6 +9135,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro #ifdef DEBUG_WALK_COND cerr << "getGroupPlan()" << endl; #endif + idbassert_s(false, "getGroupPlan is utterly out of date"); // XXX: rollup is currently not supported (not tested) in this part. // but this is not triggered in any of tests. @@ -9103,21 +9236,21 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro SELECT_LEX* select_cursor = table_ptr->derived->first_select(); // Use Pushdown handler for subquery processing - FromSubQuery fromSub(gwi, select_cursor); + FromSubQuery* fromSub = new FromSubQuery(gwi, select_cursor); string alias(table_ptr->alias.str); if (lower_case_table_names) { boost::algorithm::to_lower(alias); } - fromSub.alias(alias); + fromSub->alias(alias); CalpontSystemCatalog::TableAliasName tn = make_aliasview("", "", alias, viewName); // @bug 3852. check return execplan - SCSEP plan = fromSub.transform(); + SCSEP plan = fromSub->transform(); if (!plan) { - setError(gwi.thd, ER_INTERNAL_ERROR, fromSub.gwip().parseErrorText, gwi); + setError(gwi.thd, ER_INTERNAL_ERROR, fromSub->gwip().parseErrorText, gwi); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); return ER_INTERNAL_ERROR; } @@ -9369,7 +9502,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro bool redo = false; // empty rcWorkStack and ptWorkStack. They should all be empty by now. - clearStacks(gwi); + clearStacks(gwi, false); // indicate the starting pos of scalar returned column, because some join column // has been inserted to the returned column list. @@ -9819,7 +9952,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro // Having clause handling gwi.clauseType = HAVING; - clearStacks(gwi); + clearStacks(gwi, false); ParseTree* havingFilter = 0; // clear fatalParseError that may be left from post process functions gwi.fatalParseError = false; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index fc0ec5078..955602b62 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -128,6 +128,7 @@ using namespace funcexp; #include "ha_mcs_datatype.h" #include "statistics.h" #include "ha_mcs_logging.h" +#include "ha_subquery.h" namespace cal_impl_if { @@ -332,7 +333,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, long t std::vector& colTypes = ti.tpl_scan_ctx->ctp; - RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup; + std::shared_ptr rowGroup = ti.tpl_scan_ctx->rowGroup; // table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup // set coltype.position to be the position in rowgroup. only set once. @@ -656,7 +657,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in rowgroup::RGData rgData; ByteStream::quadbyte qb = 4; msg << qb; - rowgroup::RowGroup* rowGroup = 0; + std::unique_ptr rowGroup; uint32_t rowCount; exemgrClient->write(msg); @@ -709,7 +710,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in if (!rowGroup) { // This is mete data - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(msg); qb = 100; msg.restart(); @@ -888,7 +889,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c //@Bug 2753. the memory already freed by destructor of UpdateSqlStatement if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { - ColumnAssignment* columnAssignmentPtr; + ColumnAssignment* columnAssignmentPtr = nullptr; Item_field* item; List_iterator_fast field_it(thd->lex->first_select_lex()->item_list); List_iterator_fast value_it(thd->lex->value_list); @@ -919,6 +920,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c } else if (strcmp(tableName.c_str(), tmpTableName.c_str()) != 0) { + delete colAssignmentListPtr; + delete columnAssignmentPtr; //@ Bug3326 error out for multi table update string emsg(IDBErrorInfo::instance()->errorMsg(ERR_UPDATE_NOT_SUPPORT_FEATURE)); thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, emsg.c_str()); @@ -929,6 +932,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (!item->db_name.str) { + delete colAssignmentListPtr; + delete columnAssignmentPtr; //@Bug 5312. if subselect, wait until the schema info is available. if (thd->derived_tables_processing) return 0; @@ -1003,7 +1008,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c // sysdate() etc. if (!hasNonSupportItem && !cal_impl_if::nonConstFunc(ifp) && tmpVec.size() == 0) { - gp_walk_info gwi2(gwi.timeZone); + gp_walk_info gwi2(gwi.timeZone, gwi.subQueriesChain); gwi2.thd = thd; SRCP srcp(buildReturnedColumn(value, gwi2, gwi2.fatalParseError)); ConstantColumn* constCol = dynamic_cast(srcp.get()); @@ -1114,6 +1119,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c } else if (value->type() == Item::WINDOW_FUNC_ITEM) { + delete colAssignmentListPtr; + delete columnAssignmentPtr; setError(thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_WF_UPDATE)); return ER_CHECK_NOT_IMPLEMENTED; } @@ -1178,6 +1185,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (colAssignmentListPtr->empty() && ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { ci->affectedRows = 0; + delete colAssignmentListPtr; return 0; } @@ -1205,7 +1213,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(aTableName.table); } - CalpontDMLPackage* pDMLPackage = 0; + std::shared_ptr pDMLPackage; // dmlStmt += ";"; IDEBUG(cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id << endl); VendorDMLStatement dmlStatement(dmlStmt, sessionID); @@ -1215,7 +1223,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c else dmlStatement.set_DMLStatementType(DML_DELETE); - TableName* qualifiedTablName = new TableName(); UpdateSqlStatement updateStmt; //@Bug 2753. To make sure the momory is freed. @@ -1223,10 +1230,11 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command)) { + TableName* qualifiedTablName = new TableName(); qualifiedTablName->fName = tableName; qualifiedTablName->fSchema = schemaName; updateStmt.fNamePtr = qualifiedTablName; - pDMLPackage = CalpontDMLFactory::makeCalpontUpdatePackageFromMysqlBuffer(dmlStatement, updateStmt); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontUpdatePackageFromMysqlBuffer(dmlStatement, updateStmt)); } else if ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) //@Bug 6121 error out on multi tables delete. { @@ -1246,9 +1254,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(tableName); boost::algorithm::to_lower(aliasName); } - qualifiedTablName->fName = tableName; - qualifiedTablName->fSchema = schemaName; - pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement)); } else { @@ -1271,9 +1277,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(tableName); boost::algorithm::to_lower(aliasName); } - qualifiedTablName->fName = tableName; - qualifiedTablName->fSchema = schemaName; - pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement)); } } else @@ -1288,9 +1292,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(tableName); boost::algorithm::to_lower(aliasName); } - qualifiedTablName->fName = tableName; - qualifiedTablName->fSchema = schemaName; - pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement)); } if (!pDMLPackage) @@ -1549,7 +1551,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c updateCP->serialize(*plan); pDMLPackage->write(bytestream); - delete pDMLPackage; + pDMLPackage.reset(); ByteStream::byte b = 0; ByteStream::octbyte rows = 0; @@ -1633,12 +1635,12 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c // cout << "doUpdateDelete start new DMLProc client for ctrl-c " << " for session " << sessionID // << endl; VendorDMLStatement cmdStmt("CTRL+C", DML_COMMAND, sessionID); - CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); + std::shared_ptr pDMLPackage(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt)); pDMLPackage->set_TimeZone(timeZoneOffset); ByteStream bytestream; bytestream << static_cast(sessionID); pDMLPackage->write(bytestream); - delete pDMLPackage; + pDMLPackage.reset(); b = 1; retry = maxRetries; errorMsg = "Command canceled by user"; @@ -1756,13 +1758,13 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (command != "") { VendorDMLStatement cmdStmt(command, DML_COMMAND, sessionID); - CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); + std::shared_ptr pDMLPackage(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt)); pDMLPackage->set_TimeZone(timeZoneOffset); pDMLPackage->setTableOid(ci->tableOid); ByteStream bytestream; bytestream << static_cast(sessionID); pDMLPackage->write(bytestream); - delete pDMLPackage; + pDMLPackage.reset(); ByteStream::byte bc; std::string errMsg; @@ -2145,7 +2147,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows, const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - cal_impl_if::gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + cal_impl_if::gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.thd = thd; int rc = 0; @@ -2168,6 +2171,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows, *affected_rows = ci->affectedRows; } + return rc; } @@ -2178,7 +2182,8 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.thd = thd; if (thd->slave_thread && !get_replication_slave(thd) && @@ -2452,9 +2457,9 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) ti = ci->tableMap[table]; ti.msTablePtr = table; - if (ti.tpl_ctx == nullptr) + if (!ti.tpl_ctx) { - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); } @@ -2729,7 +2734,6 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) else ci->cal_conn_hndl = hndl; - ti.tpl_ctx = 0; } catch (IDBExcept& e) { @@ -3737,6 +3741,13 @@ int ha_mcs_impl_delete_row(const uchar* buf) return 0; } +// this place is as good as any. +ext_cond_info::ext_cond_info(long timeZone) + : chainHolder(new SubQueryChainHolder()) + , gwi(timeZone, &chainHolder->chain) +{ +} + COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condStack) { THD* thd = current_thd; @@ -3766,7 +3777,8 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.condPush = true; gwi.sessionid = tid2sid(thd->thread_id); cout << "------------------ cond push -----------------------" << endl; @@ -3782,16 +3794,17 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - ti.condInfo = new gp_walk_info(timeZoneOffset); + ti.condInfo = new ext_cond_info(timeZoneOffset); //new gp_walk_info(timeZoneOffset); } - gp_walk_info* gwi = ti.condInfo; + gp_walk_info* gwi = &ti.condInfo->gwi; gwi->dropCond = false; gwi->fatalParseError = false; gwi->condPush = true; gwi->thd = thd; gwi->sessionid = tid2sid(thd->thread_id); cond->traverse_cond(gp_walk, gwi, Item::POSTFIX); + clearDeleteStacks(*gwi); ci->tableMap[table] = ti; if (gwi->fatalParseError) @@ -4153,12 +4166,12 @@ int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table) mapiter = ci->tableMap.find(tl->table); if (mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL && - mapiter->second.condInfo->condPush) + mapiter->second.condInfo->gwi.condPush) { - while (!mapiter->second.condInfo->ptWorkStack.empty()) + while (!mapiter->second.condInfo->gwi.ptWorkStack.empty()) { - ptIt = mapiter->second.condInfo->ptWorkStack.top(); - mapiter->second.condInfo->ptWorkStack.pop(); + ptIt = mapiter->second.condInfo->gwi.ptWorkStack.top(); + mapiter->second.condInfo->gwi.ptWorkStack.pop(); gi.pushedPts.push_back(ptIt); } } @@ -4338,7 +4351,7 @@ int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table) { // MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts. - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_ctx_st.push(ti.tpl_ctx); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx); @@ -4593,6 +4606,7 @@ int ha_mcs_impl_group_by_end(TABLE* table) { bool ask_4_stats = (ci->traceFlags) ? true : false; sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats, clearScanCtx); + ti.tpl_ctx = 0; } // Normaly stats variables are set in external_lock method but we set it here // since they we pretend we are in vtable_disabled mode and the stats vars won't be set. @@ -4608,10 +4622,13 @@ int ha_mcs_impl_group_by_end(TABLE* table) ci->miniStats += hndl->miniStats; } } + else + { + ti.tpl_ctx.reset(); + } ci->cal_conn_hndl = hndl; - ti.tpl_ctx = 0; } catch (IDBExcept& e) { @@ -4684,6 +4701,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool IDEBUG(cout << "pushdown_init for table " << endl); THD* thd = current_thd; + SubQueryChainHolder chainHolder; + if (thd->slave_thread && !get_replication_slave(thd) && ha_mcs_common::isDMLStatement(thd->lex->sql_command)) return 0; @@ -4691,7 +4710,7 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.thd = thd; bool err = false; @@ -5049,7 +5068,7 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool { if (ti.tpl_ctx == 0) { - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); } @@ -5174,7 +5193,7 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone) { if (ti.tpl_ctx == 0) { - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); } diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 010b056c7..98b488f57 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -179,7 +179,15 @@ struct gp_walk_info TableOnExprList tableOnExprList; std::vector condList; - gp_walk_info(long timeZone_) + // All SubQuery allocations are single-linked into this chain. + // At the end of gp_walk_info processing we can free whole chain at once. + // This is done so because the juggling of SubQuery pointers in the + // ha_mcs_execplan code. + // There is a struct SubQueryChainHolder down below to hold chain root and free + // the chain using sorta kinda RAII. + SubQuery** subQueriesChain; + + gp_walk_info(long timeZone_, SubQuery** subQueriesChain_) : sessionid(0) , fatalParseError(false) , condPush(false) @@ -204,12 +212,21 @@ struct gp_walk_info , timeZone(timeZone_) , inSubQueryLHS(nullptr) , inSubQueryLHSItem(nullptr) + , subQueriesChain(subQueriesChain_) { } + ~gp_walk_info(); - ~gp_walk_info() - { - } +}; + +struct SubQueryChainHolder; +struct ext_cond_info +{ + // having this as a direct field would introduce + // circular dependency on header inclusion with ha_subquery.h. + boost::shared_ptr chainHolder; + gp_walk_info gwi; + ext_cond_info(long timeZone); // needs knowledge on SubQueryChainHolder, will be defined elsewhere }; struct cal_table_info @@ -223,17 +240,14 @@ struct cal_table_info cal_table_info() : tpl_ctx(0), c(0), msTablePtr(0), conn_hndl(0), condInfo(0), moreRows(false) { } - ~cal_table_info() - { - } - sm::cpsm_tplh_t* tpl_ctx; - std::stack tpl_ctx_st; + sm::sp_cpsm_tplh_t tpl_ctx; + std::stack tpl_ctx_st; sm::sp_cpsm_tplsch_t tpl_scan_ctx; std::stack tpl_scan_ctx_st; unsigned c; // for debug purpose TABLE* msTablePtr; // no ownership sm::cpsm_conhdl_t* conn_hndl; - gp_walk_info* condInfo; + ext_cond_info* condInfo; execplan::SCSEP csep; bool moreRows; // are there more rows to consume (b/c of limit) }; @@ -400,6 +414,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& cse void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg); void gp_walk(const Item* item, void* arg); +void clearDeleteStacks(gp_walk_info& gwi); void parse_item(Item* item, std::vector& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL); const std::string bestTableName(const Item_field* ifp); diff --git a/dbcon/mysql/ha_pseudocolumn.cpp b/dbcon/mysql/ha_pseudocolumn.cpp index 7bd75b6a7..1da0eaa01 100644 --- a/dbcon/mysql/ha_pseudocolumn.cpp +++ b/dbcon/mysql/ha_pseudocolumn.cpp @@ -475,6 +475,7 @@ execplan::ReturnedColumn* buildPseudoColumn(Item* item, gp_walk_info& gwi, bool& funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition(); fc->operationType(idbpartition->operationType(parms, fc->resultType())); fc->alias(ifp->full_name() ? ifp->full_name() : ""); + delete idbpartition; return fc; } diff --git a/dbcon/mysql/ha_scalar_sub.cpp b/dbcon/mysql/ha_scalar_sub.cpp index c1014c9f4..e2afd8367 100644 --- a/dbcon/mysql/ha_scalar_sub.cpp +++ b/dbcon/mysql/ha_scalar_sub.cpp @@ -230,6 +230,7 @@ execplan::ParseTree* ScalarSub::buildParseTree(PredicateOperator* op) { fGwip.fatalParseError = true; fGwip.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_INVALID_OPERATOR_WITH_LIST); + delete op; return NULL; } @@ -244,7 +245,7 @@ execplan::ParseTree* ScalarSub::buildParseTree(PredicateOperator* op) csep->subType(CalpontSelectExecutionPlan::SINGLEROW_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; @@ -270,6 +271,7 @@ execplan::ParseTree* ScalarSub::buildParseTree(PredicateOperator* op) fGwip.parseErrorText = gwi.parseErrorText; } + delete op; return NULL; } diff --git a/dbcon/mysql/ha_select_sub.cpp b/dbcon/mysql/ha_select_sub.cpp index 5a83e245b..3939c11b4 100644 --- a/dbcon/mysql/ha_select_sub.cpp +++ b/dbcon/mysql/ha_select_sub.cpp @@ -67,7 +67,7 @@ SCSEP SelectSubQuery::transform() csep->subType(CalpontSelectExecutionPlan::SELECT_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; diff --git a/dbcon/mysql/ha_subquery.h b/dbcon/mysql/ha_subquery.h index 029db0051..8195c520e 100644 --- a/dbcon/mysql/ha_subquery.h +++ b/dbcon/mysql/ha_subquery.h @@ -45,6 +45,8 @@ class SubQuery public: SubQuery(gp_walk_info& gwip) : fGwip(gwip), fCorrelated(false) { + next = *gwip.subQueriesChain; + *gwip.subQueriesChain = this; } virtual ~SubQuery() { @@ -68,11 +70,28 @@ class SubQuery { } + SubQuery* next; protected: gp_walk_info& fGwip; bool fCorrelated; }; +struct SubQueryChainHolder +{ + SubQuery* chain; + SubQueryChainHolder () : chain(nullptr) { } + ~SubQueryChainHolder () + { + while (chain) + { + SubQuery* next = chain->next; + delete chain; + chain = next; + } + } +}; + + /** * @brief A class to represent a generic WHERE clause subquery */ diff --git a/dbcon/mysql/ha_view.cpp b/dbcon/mysql/ha_view.cpp index b419ee9ee..e78cea5da 100644 --- a/dbcon/mysql/ha_view.cpp +++ b/dbcon/mysql/ha_view.cpp @@ -66,7 +66,7 @@ void View::transform() csep->sessionID(fParentGwip->sessionid); // gwi for the sub query - gp_walk_info gwi(fParentGwip->timeZone); + gp_walk_info gwi(fParentGwip->timeZone, fParentGwip->subQueriesChain); gwi.thd = fParentGwip->thd; uint32_t sessionID = csep->sessionID(); @@ -151,6 +151,7 @@ void View::transform() if (gwi.fatalParseError) { setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText); + delete csep; return; } } @@ -158,6 +159,7 @@ void View::transform() { setError(gwi.thd, ER_INTERNAL_ERROR, ie.what()); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + delete csep; return; } catch (...) @@ -165,6 +167,7 @@ void View::transform() string emsg = IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR); setError(gwi.thd, ER_INTERNAL_ERROR, emsg); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + delete csep; return; } @@ -177,6 +180,7 @@ void View::transform() // merge view list to parent fParentGwip->viewList.insert(fParentGwip->viewList.end(), gwi.viewList.begin(), gwi.viewList.end()); + gwi.viewList.clear(); // merge non-collapsed outer join to parent select stack tmpstack; @@ -192,6 +196,12 @@ void View::transform() fParentGwip->ptWorkStack.push(tmpstack.top()); tmpstack.pop(); } + while (!gwi.rcWorkStack.empty()) { + delete gwi.rcWorkStack.top(); + gwi.rcWorkStack.pop(); + } + + delete csep; } uint32_t View::processJoin(gp_walk_info& gwi, std::stack& outerJoinStack) diff --git a/dbcon/mysql/ha_window_function.cpp b/dbcon/mysql/ha_window_function.cpp index a30ba0b71..b3d992077 100644 --- a/dbcon/mysql/ha_window_function.cpp +++ b/dbcon/mysql/ha_window_function.cpp @@ -316,7 +316,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n Item_window_func* wf = (Item_window_func*)item; Item_sum* item_sum = wf->window_func(); string funcName = ConvertFuncName(item_sum); - WindowFunctionColumn* ac = new WindowFunctionColumn(funcName); + std::unique_ptr ac(new WindowFunctionColumn(funcName)); ac->timeZone(gwi.timeZone); ac->distinct(item_sum->has_with_distinct()); Window_spec* win_spec = wf->window_spec; @@ -902,8 +902,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n ac->charsetNumber(item->collation.collation->number); // put ac on windowFuncList - gwi.windowFuncList.push_back(ac); - return ac; + // we clone our managed pointer to put it into uunmanaged world. + WindowFunctionColumn* retAC = ac.release(); + gwi.windowFuncList.push_back(retAC); + return retAC; } } // namespace cal_impl_if diff --git a/dbcon/mysql/is_columnstore_extents.cpp b/dbcon/mysql/is_columnstore_extents.cpp index 0d6b00c3d..c12793cbf 100644 --- a/dbcon/mysql/is_columnstore_extents.cpp +++ b/dbcon/mysql/is_columnstore_extents.cpp @@ -187,7 +187,6 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th if (schema_table_store_record(thd, table)) { - delete emp; return 1; } @@ -197,13 +196,27 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th return 0; } +struct refresher +{ + BRM::DBRM* guarded; + refresher() + { + guarded = new BRM::DBRM(); + } + ~refresher() + { + delete guarded; + BRM::DBRM::refreshShm(); + } +}; static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond) { BRM::OID_t cond_oid = 0; TABLE* table = tables->table; - BRM::DBRM::refreshShm(); - BRM::DBRM* emp = new BRM::DBRM(); + BRM::DBRM* emp; + refresher shmRefresher; + emp = shmRefresher.guarded; if (!emp || !emp->isDBRMReady()) { @@ -289,7 +302,6 @@ static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond) return 1; } - delete emp; return 0; } diff --git a/dbcon/mysql/sm.cpp b/dbcon/mysql/sm.cpp index 098b2a655..caf6f156e 100644 --- a/dbcon/mysql/sm.cpp +++ b/dbcon/mysql/sm.cpp @@ -280,7 +280,7 @@ namespace sm { const std::string DEFAULT_SAVE_PATH = "/var/tmp"; -status_t tpl_open(tableid_t tableid, cpsm_tplh_t* ntplh, cpsm_conhdl_t* conn_hdl) +status_t tpl_open(tableid_t tableid, sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t* conn_hdl) { SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl; @@ -359,7 +359,7 @@ status_t tpl_scan_close(sp_cpsm_tplsch_t& ntplsch) return STATUS_OK; } -status_t tpl_close(cpsm_tplh_t* ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& stats, bool ask_4_stats, +status_t tpl_close(sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx) { cpsm_conhdl_t* hndl = *conn_hdl; @@ -369,7 +369,7 @@ status_t tpl_close(cpsm_tplh_t* ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& sta SMDEBUGLOG << " tableid: " << ntplh->tableid; SMDEBUGLOG << endl; - delete ntplh; + ntplh.reset(); // determine end of result set and end of statement execution if (hndl->queryState == QUERY_IN_PROCESS) diff --git a/dbcon/mysql/sm.h b/dbcon/mysql/sm.h index 06ad59ba2..0e055f8e0 100644 --- a/dbcon/mysql/sm.h +++ b/dbcon/mysql/sm.h @@ -137,12 +137,11 @@ struct cpsm_tplsch_t } ~cpsm_tplsch_t() { - delete rowGroup; } tableid_t tableid; uint64_t rowsreturned; - rowgroup::RowGroup* rowGroup; + std::shared_ptr rowGroup; messageqcpp::ByteStream bs; // rowgroup bytestream. need to stay with the life span of rowgroup uint32_t traceFlags; // @bug 649 @@ -158,7 +157,7 @@ struct cpsm_tplsch_t { if (!rowGroup) { - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(bs); } else @@ -280,6 +279,7 @@ struct cpsm_tplh_t uint16_t saveFlag; int bandsInTable; }; +typedef std::shared_ptr sp_cpsm_tplh_t; struct cpsm_tid_t { @@ -293,11 +293,11 @@ struct cpsm_tid_t extern status_t sm_init(uint32_t, cpsm_conhdl_t**, uint32_t columnstore_local_query = false); extern status_t sm_cleanup(cpsm_conhdl_t*); -extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*); +extern status_t tpl_open(tableid_t, sp_cpsm_tplh_t&, cpsm_conhdl_t*); extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); -extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, +extern status_t tpl_close(sp_cpsm_tplh_t&, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx = false); } // namespace sm diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b7632f301..549f92a7c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -33,10 +33,9 @@ if (WITH_UNITTESTS) target_link_libraries(rowgroup_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS}) gtest_add_tests(TARGET rowgroup_tests TEST_PREFIX columnstore:) - add_executable(rewritetest rewritetest.cpp) add_dependencies(rewritetest googletest) - target_link_libraries(rewritetest ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS} messageqcpp execplan) + target_link_libraries(rewritetest ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS}) gtest_add_tests(TARGET rewritetest TEST_PREFIX columnstore:) add_executable(mcs_decimal_tests mcs_decimal-tests.cpp) diff --git a/tests/scripts/fullmtr.sh b/tests/scripts/fullmtr.sh index 53f5ef20f..f3b16b570 100644 --- a/tests/scripts/fullmtr.sh +++ b/tests/scripts/fullmtr.sh @@ -7,6 +7,9 @@ CURRENT_DIR=`pwd` mysql -e "create database if not exists test;" SOCKET=`mysql -e "show variables like 'socket';" | grep socket | cut -f2` +export ASAN_OPTIONS=abort_on_error=1:disable_coredump=0,print_stats=false,detect_odr_violation=0,check_initialization_order=1,detect_stack_use_after_return=1,atexit=false,log_path=/core/asan.hz + + cd ${INSTALLED_MTR_PATH} if [[ ! -d ${COLUMSNTORE_MTR_INSTALLED} ]]; then @@ -21,7 +24,15 @@ if [[ ! -d '/data/qa/source/dbt3/' || ! -d '/data/qa/source/ssb/' ]]; then fi run_suite() { - ./mtr --force --max-test-fail=0 --testcase-timeout=60 --extern socket=$SOCKET --suite=columnstore/$1 $2 | tee $CURRENT_DIR/mtr.$1.log 2>&1 + ls /core >$CURRENT_DIR/mtr.$1.cores-before + ./mtr --force --max-test-fail=0 --testcase-timeout=60 --suite=columnstore/$1 $2 | tee $CURRENT_DIR/mtr.$1.log 2>&1 + # dump analyses. + systemctl stop mariadb + systemctl start mariadb + ls /core >$CURRENT_DIR/mtr.$1.cores-after + echo "reports or coredumps:" + diff -u $CURRENT_DIR/mtr.$1.cores-before $CURRENT_DIR/mtr.$1.cores-after && echo "no new reports or coredumps" + rm $CURRENT_DIR/mtr.$1.cores-before $CURRENT_DIR/mtr.$1.cores-after } if (( $# == 2 )); then @@ -29,8 +40,9 @@ if (( $# == 2 )); then exit 1 fi -run_suite setup +run_suite basic run_suite bugfixes +run_suite setup run_suite devregression run_suite autopilot run_suite extended diff --git a/versioning/BRM/brmshmimpl.cpp b/versioning/BRM/brmshmimpl.cpp index 82bef3b7c..f901be3fc 100644 --- a/versioning/BRM/brmshmimpl.cpp +++ b/versioning/BRM/brmshmimpl.cpp @@ -422,6 +422,7 @@ BRMManagedShmImplRBTree::BRMManagedShmImplRBTree(unsigned key, off_t size, bool BRMManagedShmImplRBTree::~BRMManagedShmImplRBTree() { + delete fShmSegment; } void BRMManagedShmImplRBTree::setReadOnly() diff --git a/writeengine/bulk/we_bulkload.cpp b/writeengine/bulk/we_bulkload.cpp index 8911d0300..2335c99c2 100644 --- a/writeengine/bulk/we_bulkload.cpp +++ b/writeengine/bulk/we_bulkload.cpp @@ -70,7 +70,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix // extern WriteEngine::BRMWrapper* brmWrapperPtr; namespace WriteEngine { -/* static */ boost::ptr_vector BulkLoad::fTableInfo; +/* static */ std::vector> BulkLoad::fTableInfo; /* static */ boost::mutex* BulkLoad::fDDLMutex = 0; /* static */ const std::string BulkLoad::DIR_BULK_JOB("job"); @@ -519,7 +519,7 @@ void BulkLoad::spawnWorkers() // NO_ERROR if success // other if fail //------------------------------------------------------------------------------ -int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) +int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr& tableInfo) { int rc = NO_ERROR, minWidth = 9999; // give a big number HWM minHWM = 999999; // rp 9/25/07 Bug 473 @@ -701,7 +701,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) << "Table-" << job.jobTableList[tableNo].tblName << "..."; fLog.logMsg(oss11.str(), MSGLVL_INFO2); - rc = saveBulkRollbackMetaData(job, tableInfo, segFileInfo, dbRootHWMInfoColVec); + rc = saveBulkRollbackMetaData(job, tableInfo.get(), segFileInfo, dbRootHWMInfoColVec); if (rc != NO_ERROR) { @@ -733,10 +733,10 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) if (job.jobTableList[tableNo].colList[i].compressionType) info = new ColumnInfoCompressed(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, - tableInfo); + tableInfo.get()); // tableInfo->rbMetaWriter()); else - info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo); + info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo.get()); if (pwd) info->setUIDGID(pwd->pw_uid, pwd->pw_gid); @@ -840,7 +840,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) if (rc) return rc; - fTableInfo.push_back(tableInfo); + fTableInfo.push_back(std::shared_ptr(tableInfo)); return NO_ERROR; } @@ -1039,19 +1039,19 @@ int BulkLoad::processJob() //-------------------------------------------------------------------------- // Validate the existence of the import data files //-------------------------------------------------------------------------- - std::vector tables; + std::vector> tables; for (i = 0; i < curJob.jobTableList.size(); i++) { - TableInfo* tableInfo = new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid, - curJob.jobTableList[i].tblName, fKeepRbMetaFiles); + std::shared_ptr tableInfo(new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid, + curJob.jobTableList[i].tblName, fKeepRbMetaFiles)); if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)) tableInfo->setBulkLoadMode(fBulkMode, fBRMRptFileName); tableInfo->setErrorDir(string(getErrorDir())); tableInfo->setTruncationAsError(getTruncationAsError()); - rc = manageImportDataFileList(curJob, i, tableInfo); + rc = manageImportDataFileList(curJob, i, tableInfo.get()); if (rc != NO_ERROR) { @@ -1515,7 +1515,7 @@ int BulkLoad::rollbackLockedTables() for (unsigned i = 0; i < fTableInfo.size(); i++) { - if (fTableInfo[i].isTableLocked()) + if (fTableInfo[i]->isTableLocked()) { lockedTableFound = true; break; @@ -1529,10 +1529,10 @@ int BulkLoad::rollbackLockedTables() // Report the tables that were successfully loaded for (unsigned i = 0; i < fTableInfo.size(); i++) { - if (!fTableInfo[i].isTableLocked()) + if (!fTableInfo[i]->isTableLocked()) { ostringstream oss; - oss << "Table " << fTableInfo[i].getTableName() << " was successfully loaded. "; + oss << "Table " << fTableInfo[i]->getTableName() << " was successfully loaded. "; fLog.logMsg(oss.str(), MSGLVL_INFO1); } } @@ -1540,24 +1540,24 @@ int BulkLoad::rollbackLockedTables() // Report the tables that were not successfully loaded for (unsigned i = 0; i < fTableInfo.size(); i++) { - if (fTableInfo[i].isTableLocked()) + if (fTableInfo[i]->isTableLocked()) { - if (fTableInfo[i].hasProcessingBegun()) + if (fTableInfo[i]->hasProcessingBegun()) { ostringstream oss; - oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")" + oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")" << " was not successfully loaded. Rolling back."; fLog.logMsg(oss.str(), MSGLVL_INFO1); } else { ostringstream oss; - oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")" + oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")" << " did not start loading. No rollback necessary."; fLog.logMsg(oss.str(), MSGLVL_INFO1); } - rc = rollbackLockedTable(fTableInfo[i]); + rc = rollbackLockedTable(*fTableInfo[i]); if (rc != NO_ERROR) { @@ -1623,9 +1623,9 @@ bool BulkLoad::addErrorMsg2BrmUpdater(const std::string& tablename, const ostrin for (int tableId = 0; tableId < size; tableId++) { - if (fTableInfo[tableId].getTableName() == tablename) + if (fTableInfo[tableId]->getTableName() == tablename) { - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); return true; } } diff --git a/writeengine/bulk/we_bulkload.h b/writeengine/bulk/we_bulkload.h index 39841b23a..926ff6c93 100644 --- a/writeengine/bulk/we_bulkload.h +++ b/writeengine/bulk/we_bulkload.h @@ -77,7 +77,7 @@ class BulkLoad : public FileOp /** * @brief Pre process jobs to validate and assign values to the job structure */ - int preProcess(Job& job, int tableNo, TableInfo* tableInfo); + int preProcess(Job& job, int tableNo, std::shared_ptr& tableInfo); /** * @brief Print job information @@ -194,7 +194,7 @@ class BulkLoad : public FileOp std::string fAlternateImportDir; // Alternate bulk import directory std::string fErrorDir; // Opt. where error records record std::string fProcessName; // Application process name - static boost::ptr_vector fTableInfo; // Vector of Table information + static std::vector> fTableInfo; // Vector of Table information int fNoOfParseThreads; // Number of parse threads int fNoOfReadThreads; // Number of read threads boost::thread_group fReadThreads; // Read thread group diff --git a/writeengine/bulk/we_bulkloadbuffer.cpp b/writeengine/bulk/we_bulkloadbuffer.cpp index 2b245181b..16739a686 100644 --- a/writeengine/bulk/we_bulkloadbuffer.cpp +++ b/writeengine/bulk/we_bulkloadbuffer.cpp @@ -1670,7 +1670,7 @@ int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo) // not aux column if (isNonAuxColumn) { - columnData = fParquetBatch->column(columnId); + columnData = fParquetBatchParser->column(columnId); } else // aux column { @@ -1789,6 +1789,13 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un int width = column.width; + int bufIndex = 1; + + if (columnData->data()->buffers.size() < 2) + { + bufIndex = 0; + } + //-------------------------------------------------------------------------- // Parse based on column data type //-------------------------------------------------------------------------- @@ -1799,7 +1806,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un //---------------------------------------------------------------------- case WriteEngine::WR_FLOAT: { - const float* dataPtr = columnData->data()->GetValues(1); + const float* dataPtr = columnData->data()->GetValues(bufIndex); for (uint32_t i = 0; i < fTotalReadRowsParser; i++) { @@ -1855,7 +1862,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un //---------------------------------------------------------------------- case WriteEngine::WR_DOUBLE: { - const double* dataPtr = columnData->data()->GetValues(1); + const double* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2005,7 +2012,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un case WriteEngine::WR_SHORT: { long long origVal; - const short* dataPtr = columnData->data()->GetValues(1); + const short* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2085,7 +2092,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un case WriteEngine::WR_USHORT: { int64_t origVal = 0; - const uint16_t* dataPtr = columnData->data()->GetValues(1); + const uint16_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2161,7 +2168,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un // if use int8_t here, it will take 8 bool value of parquet array std::shared_ptr boolArray = std::static_pointer_cast(columnData); - const int8_t* dataPtr = columnData->data()->GetValues(1); + const int8_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2250,7 +2257,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un // special handling for aux column to fix segmentation error if (columnData->type_id() != arrow::Type::type::NA) { - const uint8_t* dataPtr = columnData->data()->GetValues(1); + const uint8_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2382,7 +2389,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if (column.dataType != CalpontSystemCatalog::DATETIME && column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME) { - const long long* dataPtr = columnData->data()->GetValues(1); + const long long* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2740,7 +2747,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un std::static_pointer_cast(columnData); std::shared_ptr fType = std::static_pointer_cast(decimalArray->type()); - const int128_t* dataPtr = decimalArray->data()->GetValues(1); + const int128_t* dataPtr = decimalArray->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2803,7 +2810,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un //---------------------------------------------------------------------- case WriteEngine::WR_ULONGLONG: { - const uint64_t* dataPtr = columnData->data()->GetValues(1); + const uint64_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2869,7 +2876,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un case WriteEngine::WR_UINT: { int64_t origVal; - const uint32_t* dataPtr = columnData->data()->GetValues(1); + const uint32_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { @@ -2947,7 +2954,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un { if (column.dataType != CalpontSystemCatalog::DATE) { - const int* dataPtr = columnData->data()->GetValues(1); + const int* dataPtr = columnData->data()->GetValues(bufIndex); long long origVal; for (unsigned int i = 0; i < fTotalReadRowsParser; i++) @@ -3724,6 +3731,7 @@ int BulkLoadBuffer::fillFromFileParquet(RID& totalReadRows, RID& correctTotalRow try { + fParquetBatch.reset(); PARQUET_THROW_NOT_OK(fParquetReader->ReadNext(&fParquetBatch)); fStartRow = correctTotalRows; fStartRowForLogging = totalReadRows; diff --git a/writeengine/bulk/we_extentstripealloc.cpp b/writeengine/bulk/we_extentstripealloc.cpp index 8482cd834..98463e1b3 100644 --- a/writeengine/bulk/we_extentstripealloc.cpp +++ b/writeengine/bulk/we_extentstripealloc.cpp @@ -140,7 +140,7 @@ int ExtentStripeAlloc::allocateExtent(OID oid, uint16_t dbRoot, startLbid = extentEntryIter->second.fStartLbid; allocSize = extentEntryIter->second.fAllocSize; hwm = extentEntryIter->second.fHwm; - errMsg = extentEntryIter->second.fStatusMsg; + errMsg = *extentEntryIter->second.fStatusMsg; retStatus = extentEntryIter->second.fStatus; fMap.erase(extentEntryIter); @@ -274,7 +274,7 @@ void ExtentStripeAlloc::print() << "; seg: " << iter->second.fSegNum << "; lbid: " << iter->second.fStartLbid << "; size: " << iter->second.fAllocSize << "; hwm: " << iter->second.fHwm << "; stripe: " << iter->second.fStripeKey << "; stat: " << iter->second.fStatus - << "; msg: " << iter->second.fStatusMsg; + << "; msg: " << *iter->second.fStatusMsg; } } else diff --git a/writeengine/bulk/we_extentstripealloc.h b/writeengine/bulk/we_extentstripealloc.h index 7d30d4dbe..acdee123e 100644 --- a/writeengine/bulk/we_extentstripealloc.h +++ b/writeengine/bulk/we_extentstripealloc.h @@ -46,21 +46,6 @@ class Log; class AllocExtEntry { public: - // Default constructor - AllocExtEntry() - : fOid(0) - , fColWidth(0) - , fDbRoot(0) - , fPartNum(0) - , fSegNum(0) - , fStartLbid(0) - , fAllocSize(0) - , fHwm(0) - , fStatus(NO_ERROR) - , fStripeKey(0) - { - } - // Used to create entry for an existing extent we are going to add data to. AllocExtEntry(OID& oid, int colWidth, uint16_t dbRoot, uint32_t partNum, uint16_t segNum, BRM::LBID_t startLbid, int allocSize, HWM hwm, int status, const std::string& statusMsg, @@ -74,22 +59,22 @@ class AllocExtEntry , fAllocSize(allocSize) , fHwm(hwm) , fStatus(status) - , fStatusMsg(statusMsg) + , fStatusMsg(new std::string(statusMsg)) , fStripeKey(stripeKey) { } - OID fOid; // column OID - int fColWidth; // colum width (in bytes) - uint16_t fDbRoot; // DBRoot of allocated extent - uint32_t fPartNum; // Partition number of allocated extent - uint16_t fSegNum; // Segment number of allocated extent - BRM::LBID_t fStartLbid; // Starting LBID of allocated extent - int fAllocSize; // Number of allocated LBIDS - HWM fHwm; // Starting fbo or hwm of allocated extent - int fStatus; // Status of extent allocation - std::string fStatusMsg; // Status msg of extent allocation - unsigned int fStripeKey; // "Stripe" identifier for this extent + OID fOid = 0; // column OID + int fColWidth = 0; // colum width (in bytes) + uint16_t fDbRoot = 0; // DBRoot of allocated extent + uint32_t fPartNum = 0; // Partition number of allocated extent + uint16_t fSegNum = 0; // Segment number of allocated extent + BRM::LBID_t fStartLbid = 0; // Starting LBID of allocated extent + int fAllocSize = 0; // Number of allocated LBIDS + HWM fHwm = 0; // Starting fbo or hwm of allocated extent + int fStatus = NO_ERROR; // Status of extent allocation + std::shared_ptr fStatusMsg{new std::string()}; // Status msg of extent allocation + unsigned int fStripeKey = 0; // "Stripe" identifier for this extent }; //------------------------------------------------------------------------------ diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 7b4c2d0ee..5c6c8eb3b 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -182,7 +182,7 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN TableInfo::~TableInfo() { fBRMReporter.sendErrMsgToFile(fBRMRptFileName); - freeProcessingBuffers(); + //freeProcessingBuffers(); } //------------------------------------------------------------------------------ diff --git a/writeengine/bulk/we_workers.cpp b/writeengine/bulk/we_workers.cpp index dcd4f1d54..26870e88d 100644 --- a/writeengine/bulk/we_workers.cpp +++ b/writeengine/bulk/we_workers.cpp @@ -95,16 +95,16 @@ void BulkLoad::read(int id) #ifdef PROFILE Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL); #endif - int rc = fTableInfo[tableId].readTableData(); + int rc = fTableInfo[tableId]->readTableData(); if (rc != NO_ERROR) { // Error occurred while reading the data, break out of loop. BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; - oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); break; } @@ -117,7 +117,7 @@ void BulkLoad::read(int id) if (tableId != -1) oss << "Bulkload Read (thread " << id << ") Stopped reading Table " - << fTableInfo[tableId].getTableName() << ". " << ex.what(); + << fTableInfo[tableId]->getTableName() << ". " << ex.what(); else oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what(); @@ -129,14 +129,14 @@ void BulkLoad::read(int id) ostringstream oss; if (tableId != -1) - oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what() << ". Terminating this job."; else oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what() << ". Terminating this job."; if (tableId != -1) - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } @@ -146,13 +146,13 @@ void BulkLoad::read(int id) ostringstream oss; if (tableId != -1) - oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; else oss << "Bulkload Read (thread " << id << ") Failed for Table. Terminating this job."; if (tableId != -1) - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } @@ -170,7 +170,7 @@ int BulkLoad::lockTableForRead(int id) for (unsigned i = 0; i < fTableInfo.size(); ++i) { - if (fTableInfo[i].lockForRead(id)) + if (fTableInfo[i]->lockForRead(id)) return i; } @@ -292,16 +292,16 @@ void BulkLoad::parse(int id) // Have obtained the table and column for parsing. // Start parsing the column data. double processingTime; - int rc = fTableInfo[tableId].parseColumn(columnId, myParseBuffer, processingTime); + int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime); if (rc != NO_ERROR) { // Error occurred while parsing the data, break out of loop. BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; - oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << " during parsing. Terminating this job."; - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); setParseErrorOnTable(tableId, true); @@ -310,7 +310,7 @@ void BulkLoad::parse(int id) // Parsing is complete. Acquire the mutex and increment // the parsingComplete value for the buffer - if (fTableInfo[tableId].getStatusTI() != WriteEngine::ERR) + if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE); @@ -320,15 +320,15 @@ void BulkLoad::parse(int id) Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE); Stats::startParseEvent(WE_STATS_COMPLETING_PARSE); #endif - rc = fTableInfo[tableId].setParseComplete(columnId, myParseBuffer, processingTime); + rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime); if (rc != NO_ERROR) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; oss << "Bulkload Parse (thread " << id << ") Failed for Table " - << fTableInfo[tableId].getTableName() << " during parse completion. Terminating this job."; - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + << fTableInfo[tableId]->getTableName() << " during parse completion. Terminating this job."; + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); setParseErrorOnTable(tableId, false); @@ -349,7 +349,7 @@ void BulkLoad::parse(int id) if (tableId != -1) { oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table " - << fTableInfo[tableId].getTableName() << ". " << ex.what(); + << fTableInfo[tableId]->getTableName() << ". " << ex.what(); setParseErrorOnTable(tableId, true); } @@ -367,11 +367,11 @@ void BulkLoad::parse(int id) if (tableId != -1) { - oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what() << ". Terminating this job."; setParseErrorOnTable(tableId, true); - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); } else { @@ -388,11 +388,11 @@ void BulkLoad::parse(int id) if (tableId != -1) { - oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; setParseErrorOnTable(tableId, true); - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); } else { @@ -421,10 +421,10 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& for (unsigned i = 0; i < fTableInfo.size(); ++i) { - if (fTableInfo[i].getStatusTI() == WriteEngine::PARSE_COMPLETE) + if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE) continue; - int currentParseBuffer = fTableInfo[i].getCurrentParseBuffer(); + int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer(); myParseBuffer = currentParseBuffer; do @@ -434,7 +434,7 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& { ostringstream oss; std::string bufStatusStr; - Status stat = fTableInfo[i].getStatusTI(); + Status stat = fTableInfo[i]->getStatusTI(); ColumnInfo::convertStatusToString(stat, bufStatusStr); oss << " - " << pthread_self() << ":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")"; @@ -452,13 +452,13 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& // @bug2099- // get a buffer and column to parse if available. - if ((columnId = fTableInfo[i].getColumnForParse(thrdId, myParseBuffer, report)) != -1) + if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1) { tableId = i; return true; } - myParseBuffer = (myParseBuffer + 1) % fTableInfo[i].getNumberOfBuffers(); + myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers(); } while (myParseBuffer != currentParseBuffer); } @@ -479,10 +479,10 @@ bool BulkLoad::allTablesDone(Status status) { for (unsigned i = 0; i < fTableInfo.size(); ++i) { - if (fTableInfo[i].getStatusTI() == WriteEngine::ERR) + if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR) return true; - if (fTableInfo[i].getStatusTI() != status) + if (fTableInfo[i]->getStatusTI() != status) return false; } @@ -499,11 +499,11 @@ void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex) if (lockParseMutex) { boost::mutex::scoped_lock lock(fParseMutex); - fTableInfo[tableId].setParseError(); + fTableInfo[tableId]->setParseError(); } else { - fTableInfo[tableId].setParseError(); + fTableInfo[tableId]->setParseError(); } } diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index e065630b5..ffc783c78 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -57,6 +57,7 @@ using namespace execplan; namespace WriteEngine { BRMWrapper* volatile BRMWrapper::m_instance = NULL; +std::atomic BRMWrapper::finishReported(false); boost::thread_specific_ptr BRMWrapper::m_ThreadDataPtr; boost::mutex BRMWrapper::m_instanceCreateMutex; @@ -750,6 +751,10 @@ uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId) void BRMWrapper::finishCpimportJob(uint32_t jobId) { + if (finishReported.exchange(true)) // get old and set to true; if old is true, do nothing. + { + return; + } blockRsltnMgrPtr->finishCpimportJob(jobId); } diff --git a/writeengine/shared/we_brm.h b/writeengine/shared/we_brm.h index d8bc403ae..e091e38ae 100644 --- a/writeengine/shared/we_brm.h +++ b/writeengine/shared/we_brm.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include @@ -474,6 +475,8 @@ class BRMWrapper : public WEObj static IDBDataFile* m_curVBFile; BRM::DBRM* blockRsltnMgrPtr; + + EXPORT static std::atomic finishReported; }; //------------------------------------------------------------------------------