diff --git a/dbcon/ddlpackage/ddl.l b/dbcon/ddlpackage/ddl.l index c0aec2a94..5404542f0 100644 --- a/dbcon/ddlpackage/ddl.l +++ b/dbcon/ddlpackage/ddl.l @@ -34,7 +34,7 @@ int ddldebug = 0; int lineno = 1; void ddlerror(struct pass_to_bison* x, char const *s); -static char* scanner_copy(char *str, yyscan_t yyscanner, copy_action_t action = NOOP ); +static char* scanner_copy(const char *str, yyscan_t yyscanner, copy_action_t action = NOOP ); %} @@ -115,9 +115,9 @@ CONSTRAINT {return CONSTRAINT;} CONSTRAINTS {return CONSTRAINTS;} CREATE {return CREATE;} CURRENT_USER {return CURRENT_USER;} -DATE {ddlget_lval(yyscanner)->str=strdup("date"); return DATE;} +DATE {ddlget_lval(yyscanner)->str = scanner_copy("date", yyscanner); return DATE;} DATETIME {return DATETIME;} -TIME {ddlget_lval(yyscanner)->str=strdup("time"); return TIME;} +TIME {ddlget_lval(yyscanner)->str = scanner_copy("time", yyscanner); return TIME;} TIMESTAMP {return TIMESTAMP;} DECIMAL {return DECIMAL;} DEC {return DECIMAL;} @@ -276,7 +276,7 @@ void scanner_finish(yyscan_t yyscanner) pScanData->valbuf.clear(); } -char* scanner_copy (char *str, yyscan_t yyscanner, copy_action_t action) +char* scanner_copy (const char *str, yyscan_t yyscanner, copy_action_t action) { char* result; char* nv = strdup(str); diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index 2d47e70c0..11a51c5f3 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -105,7 +105,6 @@ void fix_column_length_and_charset(SchemaObject* elem, const CHARSET_INFO* def_c column->fType->fLength = 16777215; } } - %} %expect 17 @@ -255,6 +254,24 @@ ZEROFILL %type opt_quoted_literal %type opt_column_charset %type opt_column_collate + +// for pointers to vectors of pointers: +%destructor { if ($$) { for (auto p : *($$)) { delete p; } }; delete $$; } table_element_list + +// for objects allocated during parse:. +%destructor { delete $$; } qualified_name ident table_element column_def exact_numeric_type +%destructor { delete $$; } table_options opt_table_options table_name +%destructor { delete $$; } column_name_list data_type column_constraint +%destructor { delete $$; } column_constraint_def column_qualifier_list +%destructor { delete $$; } opt_referential_triggered_action referential_triggered_action +%destructor { delete $$; } table_option character_string_type binary_string_type blob_type +%destructor { delete $$; } text_type numeric_type table_constraint table_constraint_def + +// NOTE: if you have a leak in this code and do not know which one leaks +// add %destructor { printf("pop yykind %d\n"; fflush(stdout); } <*> +// this will print tags in residual stack after syntax error and you'll see +// what is not delete'd. + %% stmtblock: stmtmulti { x->fParseTree = $1; } ; @@ -685,7 +702,7 @@ qualified_name: if (x->fDBSchema.size()) $$ = new QualifiedName((char*)x->fDBSchema.c_str(), $1); else - $$ = new QualifiedName($1); + $$ = new QualifiedName($1); } | ident '.' ident { diff --git a/dbcon/dmlpackage/dml.l b/dbcon/dmlpackage/dml.l index fca2d3f25..babff264e 100644 --- a/dbcon/dmlpackage/dml.l +++ b/dbcon/dmlpackage/dml.l @@ -42,7 +42,7 @@ namespace dmlpackage { int lineno = 1; -static char* scanner_copy (char *str, yyscan_t yyscanner); +static char* scanner_copy (const char *str, yyscan_t yyscanner); /* macro to save the text and return a token */ @@ -256,7 +256,7 @@ void scanner_finish(yyscan_t yyscanner) pScanData->valbuf.clear(); } -char* scanner_copy (char *str, yyscan_t yyscanner) +char* scanner_copy (const char *str, yyscan_t yyscanner) { char* nv = strdup(str); if(nv) diff --git a/dbcon/mysql/ha_exists_sub.cpp b/dbcon/mysql/ha_exists_sub.cpp index a97abfa44..181d1c83c 100644 --- a/dbcon/mysql/ha_exists_sub.cpp +++ b/dbcon/mysql/ha_exists_sub.cpp @@ -96,7 +96,7 @@ execplan::ParseTree* ExistsSub::transform() csep->subType(CalpontSelectExecutionPlan::EXISTS_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; diff --git a/dbcon/mysql/ha_from_sub.cpp b/dbcon/mysql/ha_from_sub.cpp index 9013c8fe6..177887323 100644 --- a/dbcon/mysql/ha_from_sub.cpp +++ b/dbcon/mysql/ha_from_sub.cpp @@ -424,7 +424,7 @@ SCSEP FromSubQuery::transform() csep->subType(CalpontSelectExecutionPlan::FROM_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; gwi.viewName = fGwip.viewName; diff --git a/dbcon/mysql/ha_in_sub.cpp b/dbcon/mysql/ha_in_sub.cpp index 989e06ad5..8b2772efd 100644 --- a/dbcon/mysql/ha_in_sub.cpp +++ b/dbcon/mysql/ha_in_sub.cpp @@ -151,7 +151,7 @@ execplan::ParseTree* InSub::transform() csep->subType(CalpontSelectExecutionPlan::IN_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; diff --git a/dbcon/mysql/ha_mcs_ddl.cpp b/dbcon/mysql/ha_mcs_ddl.cpp index 0ff7c48ee..0acde2392 100644 --- a/dbcon/mysql/ha_mcs_ddl.cpp +++ b/dbcon/mysql/ha_mcs_ddl.cpp @@ -344,7 +344,7 @@ bool anyRowInTable(string& schema, string& tableName, int sessionID) rowgroup::RGData rgData; ByteStream::quadbyte qb = 4; msg << qb; - rowgroup::RowGroup* rowGroup = 0; + std::shared_ptr rowGroup = 0; bool anyRow = false; exemgrClient->write(msg); @@ -397,7 +397,7 @@ bool anyRowInTable(string& schema, string& tableName, int sessionID) if (!rowGroup) { // This is mete data - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(msg); qb = 100; msg.restart(); @@ -515,7 +515,7 @@ bool anyTimestampColumn(string& schema, string& tableName, int sessionID) rowgroup::RGData rgData; ByteStream::quadbyte qb = 4; msg << qb; - rowgroup::RowGroup* rowGroup = 0; + std::shared_ptr rowGroup = 0; bool anyRow = false; exemgrClient->write(msg); @@ -568,7 +568,7 @@ bool anyTimestampColumn(string& schema, string& tableName, int sessionID) if (!rowGroup) { // This is mete data - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(msg); qb = 100; msg.restart(); diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 2645be69e..69cc63e6f 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -293,13 +293,66 @@ string escapeBackTick(const char* str) return ret; } -void clearStacks(gp_walk_info& gwi) +cal_impl_if::gp_walk_info::~gp_walk_info() +{ + while (!rcWorkStack.empty()) + { + delete rcWorkStack.top(); + rcWorkStack.pop(); + } + + while (!ptWorkStack.empty()) + { + delete ptWorkStack.top(); + ptWorkStack.pop(); + } + for (uint32_t i=0;i& join_list, ParseTree* pt = new ParseTree(onFilter); outerJoinStack.push(pt); } + } else // inner join { @@ -1545,7 +1599,7 @@ ParseTree* buildRowPredicate(gp_walk_info* gwip, RowColumn* lhs, RowColumn* rhs, ParseTree* pt = new ParseTree(lo); sop->setOpType(lhs->columnVec()[0]->resultType(), rhs->columnVec()[0]->resultType()); - SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[0].get(), rhs->columnVec()[0].get()); + SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[0]->clone(), rhs->columnVec()[0]->clone()); sf->timeZone(gwip->timeZone); pt->left(new ParseTree(sf)); @@ -1553,7 +1607,7 @@ ParseTree* buildRowPredicate(gp_walk_info* gwip, RowColumn* lhs, RowColumn* rhs, { sop.reset(po->clone()); sop->setOpType(lhs->columnVec()[i]->resultType(), rhs->columnVec()[i]->resultType()); - SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[i].get(), rhs->columnVec()[i].get()); + SimpleFilter* sf = new SimpleFilter(sop, lhs->columnVec()[i]->clone(), rhs->columnVec()[i]->clone()); sf->timeZone(gwip->timeZone); pt->right(new ParseTree(sf)); @@ -1570,6 +1624,12 @@ ParseTree* buildRowPredicate(gp_walk_info* gwip, RowColumn* lhs, RowColumn* rhs, bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, Item_func* ifp) { + // rhs and lhs are being dismembered here and then get thrown away, leaking. + // So we create two scoped pointers to get them delete'd automatically at + // return. + // Also look below into heldoutVals vector - it contains values produced from + // ifp's arguments. + const std::unique_ptr rhsp(rhs), lhsp(lhs); if (ifp->functype() == Item_func::EQ_FUNC || ifp->functype() == Item_func::NE_FUNC) { // (c1,c2,..) = (v1,v2,...) transform to: c1=v1 and c2=v2 and ... @@ -1601,6 +1661,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It // two entries have been popped from the stack already: lhs and rhs stack tmpStack; vector valVec; + vector heldOutVals; // these vals are not rhs/lhs and need to be freed tmpStack.push(rhs); tmpStack.push(lhs); assert(gwip->rcWorkStack.size() >= ifp->argument_count() - 2); @@ -1608,6 +1669,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It for (uint32_t i = 2; i < ifp->argument_count(); i++) { tmpStack.push(gwip->rcWorkStack.top()); + heldOutVals.push_back(SRCP(gwip->rcWorkStack.top())); if (!gwip->rcWorkStack.empty()) gwip->rcWorkStack.pop(); @@ -1627,7 +1689,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It vals = dynamic_cast(tmpStack.top()); valVec.push_back(vals); tmpStack.pop(); - pt1->right(buildRowPredicate(gwip, columns->clone(), vals, predicateOp)); + pt1->right(buildRowPredicate(gwip, columns, vals, predicateOp)); pt = pt1; } @@ -1644,7 +1706,7 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It for (uint32_t i = 0; i < columns->columnVec().size(); i++) { - ConstantFilter* cf = new ConstantFilter(); + std::unique_ptr cf(new ConstantFilter()); sop.reset(lo->clone()); cf->op(sop); @@ -1652,7 +1714,9 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It // no optimization for non-simple column because CP won't apply if (!sc) + { continue; + } ssc.reset(sc->clone()); cf->col(ssc); @@ -1674,9 +1738,11 @@ bool buildRowColumnFilter(gp_walk_info* gwip, RowColumn* rhs, RowColumn* lhs, It } if (j < valVec.size()) + { continue; + } - tmpPtStack.push(new ParseTree(cf)); + tmpPtStack.push(new ParseTree(cf.release())); } // "and" all the filters together @@ -2027,7 +2093,10 @@ bool buildPredicateItem(Item_func* ifp, gp_walk_info* gwip) } if (!gwip->rcWorkStack.empty()) + { + delete gwip->rcWorkStack.top(); gwip->rcWorkStack.pop(); // pop gwip->scsp + } if (cf->filterList().size() < inp->argument_count() - 1) { @@ -2838,7 +2907,6 @@ void setError(THD* thd, uint32_t errcode, string errmsg) void setError(THD* thd, uint32_t errcode, string errmsg, gp_walk_info& gwi) { setError(thd, errcode, errmsg); - clearStacks(gwi); } int setErrorAndReturn(gp_walk_info& gwi) @@ -4108,6 +4176,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (!sptp) { nonSupport = true; + delete fc; return NULL; } @@ -4122,6 +4191,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non nonSupport = true; gwi.fatalParseError = true; gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_SUB_EXPRESSION); + delete fc; return NULL; } @@ -4153,6 +4223,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (!rc || nonSupport) { nonSupport = true; + delete fc; return NULL; } @@ -4181,6 +4252,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non else { nonSupport = true; + delete fc; return NULL; } } @@ -4569,6 +4641,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS gwi.inCaseStmt = false; if (!gwi.ptWorkStack.empty() && *gwi.ptWorkStack.top() == *sptp.get()) { + delete gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); } } @@ -4589,6 +4662,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS // We need to pop whichever stack is holding it, if any. if ((!gwi.rcWorkStack.empty()) && *gwi.rcWorkStack.top() == parm) { + delete gwi.rcWorkStack.top(); gwi.rcWorkStack.pop(); } else if (!gwi.ptWorkStack.empty()) @@ -4596,7 +4670,10 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS ReturnedColumn* ptrc = dynamic_cast(gwi.ptWorkStack.top()->data()); if (ptrc && *ptrc == *parm) + { + delete gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); + } } } else @@ -4606,6 +4683,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS // We need to pop whichever stack is holding it, if any. if ((!gwi.ptWorkStack.empty()) && *gwi.ptWorkStack.top()->data() == sptp->data()) { + delete gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); } else if (!gwi.rcWorkStack.empty()) @@ -4616,6 +4694,7 @@ FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonS if (ptrc && *ptrc == *gwi.rcWorkStack.top()) { + delete gwi.rcWorkStack.top(); gwi.rcWorkStack.pop(); } } @@ -5287,6 +5366,18 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi) ac->constCol(SRCP(rc)); break; } + // the "rc" can be in gwi.no_parm_func_list. erase it from that list and + // then delete it. + // kludge, I know. + uint32_t i; + + for (i = 0; gwi.no_parm_func_list[i] != rc && i < gwi.no_parm_func_list.size(); i++) { } + + if (i < gwi.no_parm_func_list.size()) + { + gwi.no_parm_func_list.erase(gwi.no_parm_func_list.begin() + i); + delete rc; + } } } @@ -5982,7 +6073,10 @@ void gp_walk(const Item* item, void* arg) { // @bug 4215. remove the argument in rcWorkStack. if (!gwip->rcWorkStack.empty()) + { + delete gwip->rcWorkStack.top(); gwip->rcWorkStack.pop(); + } break; } @@ -6017,6 +6111,7 @@ void gp_walk(const Item* item, void* arg) for (uint32_t i = 0; i < ifp->argument_count() && !gwip->rcWorkStack.empty(); i++) { + delete gwip->rcWorkStack.top(); gwip->rcWorkStack.pop(); } @@ -6193,6 +6288,7 @@ void gp_walk(const Item* item, void* arg) } else { + delete rhs; gwip->ptWorkStack.push(lhs); continue; } @@ -6824,21 +6920,21 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& if (table_ptr->derived) { SELECT_LEX* select_cursor = table_ptr->derived->first_select(); - FromSubQuery fromSub(gwi, select_cursor); + FromSubQuery* fromSub = new FromSubQuery(gwi, select_cursor); string alias(table_ptr->alias.str); if (lower_case_table_names) { boost::algorithm::to_lower(alias); } - fromSub.alias(alias); + fromSub->alias(alias); CalpontSystemCatalog::TableAliasName tn = make_aliasview("", "", alias, viewName); // @bug 3852. check return execplan - SCSEP plan = fromSub.transform(); + SCSEP plan = fromSub->transform(); if (!plan) { - setError(gwi.thd, ER_INTERNAL_ERROR, fromSub.gwip().parseErrorText, gwi); + setError(gwi.thd, ER_INTERNAL_ERROR, fromSub->gwip().parseErrorText, gwi); CalpontSystemCatalog::removeCalpontSystemCatalog(gwi.sessionid); return ER_INTERNAL_ERROR; } @@ -6962,7 +7058,7 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& plan->data(csep->data()); // gwi for the union unit - gp_walk_info union_gwi(gwi.timeZone); + gp_walk_info union_gwi(gwi.timeZone, gwi.subQueriesChain); union_gwi.thd = gwi.thd; uint32_t err = 0; @@ -7165,14 +7261,28 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s std::stack outerJoinStack; if ((failed = buildJoin(gwi, select_lex.top_join_list, outerJoinStack))) + { + while (!outerJoinStack.empty()) + { + delete outerJoinStack.top(); + outerJoinStack.pop(); + } return failed; + } if (gwi.subQuery) { for (uint i = 0; i < gwi.viewList.size(); i++) { if ((failed = gwi.viewList[i]->processJoin(gwi, outerJoinStack))) + { + while (!outerJoinStack.empty()) + { + delete outerJoinStack.top(); + outerJoinStack.pop(); + } return failed; + } } } @@ -7187,7 +7297,7 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s // is pushed to rcWorkStack. if (gwi.ptWorkStack.empty() && !gwi.rcWorkStack.empty()) { - filters = new ParseTree(gwi.rcWorkStack.top()); + gwi.ptWorkStack.push(new ParseTree(gwi.rcWorkStack.top())); gwi.rcWorkStack.pop(); } @@ -7240,6 +7350,7 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s "columnstore_max_allowed_in_values " "threshold.", gwi); + delete filters; return ER_CHECK_NOT_IMPLEMENTED; } @@ -7264,6 +7375,26 @@ int processWhere(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, const s csep->filters(filters); } + if (!gwi.rcWorkStack.empty()) + { + while(!gwi.rcWorkStack.empty()) + { + ReturnedColumn* t = gwi.rcWorkStack.top(); + delete t; + gwi.rcWorkStack.pop(); + } + } + if (!gwi.ptWorkStack.empty()) + { + while(!gwi.ptWorkStack.empty()) + { + ParseTree* t = gwi.ptWorkStack.top(); + delete t; + gwi.ptWorkStack.pop(); + } + } + + return 0; } @@ -7599,7 +7730,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i vector funcFieldVec; // empty rcWorkStack and ptWorkStack. They should all be empty by now. - clearStacks(gwi); + clearStacks(gwi, false, true); // indicate the starting pos of scalar returned column, because some join column // has been inserted to the returned column list. @@ -7936,6 +8067,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i gwi.parseErrorText = "Unsupported Item in SELECT subquery."; setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi); + return ER_CHECK_NOT_IMPLEMENTED; } @@ -8047,8 +8179,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // Having clause handling gwi.clauseType = HAVING; - clearStacks(gwi); - ParseTree* havingFilter = 0; + clearStacks(gwi, false, true); + std::unique_ptr havingFilter; // clear fatalParseError that may be left from post process functions gwi.fatalParseError = false; gwi.parseErrorText = ""; @@ -8074,20 +8206,20 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // @bug 4215. some function filter will be in the rcWorkStack. if (gwi.ptWorkStack.empty() && !gwi.rcWorkStack.empty()) { - havingFilter = new ParseTree(gwi.rcWorkStack.top()); + gwi.ptWorkStack.push(new ParseTree(gwi.rcWorkStack.top())); gwi.rcWorkStack.pop(); } while (!gwi.ptWorkStack.empty()) { - havingFilter = gwi.ptWorkStack.top(); + havingFilter.reset(gwi.ptWorkStack.top()); gwi.ptWorkStack.pop(); if (gwi.ptWorkStack.empty()) break; ptp = new ParseTree(new LogicOperator("and")); - ptp->left(havingFilter); + ptp->left(havingFilter.release()); rhs = gwi.ptWorkStack.top(); gwi.ptWorkStack.pop(); ptp->right(rhs); @@ -8118,9 +8250,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i if (havingFilter) { ParseTree* ptp = new ParseTree(new LogicOperator("and")); - ptp->left(havingFilter); + ptp->left(havingFilter.release()); ptp->right(inToExistsFilter); - havingFilter = ptp; + havingFilter.reset(ptp); } else { @@ -8350,6 +8482,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { if (strcasecmp(sc->alias().c_str(), gwi.returnedCols[j]->alias().c_str()) == 0) { + delete rc; rc = gwi.returnedCols[j].get()->clone(); rc->orderPos(j); break; @@ -8362,6 +8495,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { if (ifp->name.length && string(ifp->name.str) == gwi.returnedCols[j].get()->alias()) { + delete rc; rc = gwi.returnedCols[j].get()->clone(); rc->orderPos(j); break; @@ -8713,21 +8847,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i { } } - else if (ord_item->type() == Item::FUNC_ITEM) - { - // @bug5636. check if this order by column is on the select list - ReturnedColumn* rc = buildFunctionColumn((Item_func*)(ord_item), gwi, gwi.fatalParseError); - - for (uint32_t i = 0; i < gwi.returnedCols.size(); i++) - { - if (rc && rc->operator==(gwi.returnedCols[i].get())) - { - ostringstream oss; - oss << i + 1; - break; - } - } - } } } @@ -8830,20 +8949,28 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i csep->orderByCols(gwi.orderByCols); csep->returnedCols(gwi.returnedCols); csep->columnMap(gwi.columnMap); - csep->having(havingFilter); + csep->having(havingFilter.release()); csep->derivedTableList(gwi.derivedTbList); csep->selectSubList(selectSubList); csep->subSelectList(gwi.subselectList); - clearStacks(gwi); return 0; } int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone) { - gp_walk_info* gwi = ti.condInfo; - if (!gwi) - gwi = new gp_walk_info(timeZone); + SubQueryChainHolder chainHolder; + bool allocated = false; + gp_walk_info* gwi; + if (ti.condInfo) + { + gwi = &ti.condInfo->gwi; + } + else + { + allocated = true; + gwi = new gp_walk_info(timeZone, &chainHolder.chain); + } gwi->thd = thd; LEX* lex = thd->lex; @@ -8894,7 +9021,7 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone) // get filter if (ti.condInfo) { - gp_walk_info* gwi = ti.condInfo; + gp_walk_info* gwi = &ti.condInfo->gwi; ParseTree* filters = 0; ParseTree* ptp = 0; ParseTree* rhs = 0; @@ -8940,6 +9067,10 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone) // @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering csep->stringScanThreshold(get_string_scan_threshold(gwi->thd)); + if (allocated) + { + delete gwi; + } return 0; } @@ -8949,7 +9080,8 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi) const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQuery* chain = nullptr; + gp_walk_info gwi(timeZoneOffset, &chain); gwi.thd = thd; gwi.isGroupByHandler = true; int status = getGroupPlan(gwi, *select_lex, csep, gi); @@ -9087,6 +9219,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro #ifdef DEBUG_WALK_COND cerr << "getGroupPlan()" << endl; #endif + idbassert_s(false, "getGroupPlan is utterly out of date"); // XXX: rollup is currently not supported (not tested) in this part. // but this is not triggered in any of tests. @@ -9187,21 +9320,21 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro SELECT_LEX* select_cursor = table_ptr->derived->first_select(); // Use Pushdown handler for subquery processing - FromSubQuery fromSub(gwi, select_cursor); + FromSubQuery* fromSub = new FromSubQuery(gwi, select_cursor); string alias(table_ptr->alias.str); if (lower_case_table_names) { boost::algorithm::to_lower(alias); } - fromSub.alias(alias); + fromSub->alias(alias); CalpontSystemCatalog::TableAliasName tn = make_aliasview("", "", alias, viewName); // @bug 3852. check return execplan - SCSEP plan = fromSub.transform(); + SCSEP plan = fromSub->transform(); if (!plan) { - setError(gwi.thd, ER_INTERNAL_ERROR, fromSub.gwip().parseErrorText, gwi); + setError(gwi.thd, ER_INTERNAL_ERROR, fromSub->gwip().parseErrorText, gwi); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); return ER_INTERNAL_ERROR; } @@ -9453,7 +9586,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro bool redo = false; // empty rcWorkStack and ptWorkStack. They should all be empty by now. - clearStacks(gwi); + clearStacks(gwi, false); // indicate the starting pos of scalar returned column, because some join column // has been inserted to the returned column list. @@ -9903,7 +10036,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro // Having clause handling gwi.clauseType = HAVING; - clearStacks(gwi); + clearStacks(gwi, false); ParseTree* havingFilter = 0; // clear fatalParseError that may be left from post process functions gwi.fatalParseError = false; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 1f6a0031d..45db727b9 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -128,6 +128,7 @@ using namespace funcexp; #include "ha_mcs_datatype.h" #include "statistics.h" #include "ha_mcs_logging.h" +#include "ha_subquery.h" namespace cal_impl_if { @@ -332,7 +333,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, long t std::vector& colTypes = ti.tpl_scan_ctx->ctp; - RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup; + std::shared_ptr rowGroup = ti.tpl_scan_ctx->rowGroup; // table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup // set coltype.position to be the position in rowgroup. only set once. @@ -656,7 +657,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in rowgroup::RGData rgData; ByteStream::quadbyte qb = 4; msg << qb; - rowgroup::RowGroup* rowGroup = 0; + std::unique_ptr rowGroup; uint32_t rowCount; exemgrClient->write(msg); @@ -709,7 +710,7 @@ vector getOnUpdateTimestampColumns(string& schema, string& tableName, in if (!rowGroup) { // This is mete data - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(msg); qb = 100; msg.restart(); @@ -887,7 +888,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c //@Bug 2753. the memory already freed by destructor of UpdateSqlStatement if (isUpdateStatement(thd->lex->sql_command)) { - ColumnAssignment* columnAssignmentPtr; + ColumnAssignment* columnAssignmentPtr = nullptr; Item_field* item; List_iterator_fast field_it(thd->lex->first_select_lex()->item_list); List_iterator_fast value_it(thd->lex->value_list); @@ -919,6 +920,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c } else if (strcmp(tableName.c_str(), tmpTableName.c_str()) != 0) { + delete colAssignmentListPtr; + delete columnAssignmentPtr; //@ Bug3326 error out for multi table update string emsg(IDBErrorInfo::instance()->errorMsg(ERR_UPDATE_NOT_SUPPORT_FEATURE)); thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, emsg.c_str()); @@ -929,6 +932,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (!item->db_name.str) { + delete colAssignmentListPtr; + delete columnAssignmentPtr; //@Bug 5312. if subselect, wait until the schema info is available. if (thd->derived_tables_processing) return 0; @@ -1003,7 +1008,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c // sysdate() etc. if (!hasNonSupportItem && !cal_impl_if::nonConstFunc(ifp) && tmpVec.size() == 0) { - gp_walk_info gwi2(gwi.timeZone); + gp_walk_info gwi2(gwi.timeZone, gwi.subQueriesChain); gwi2.thd = thd; SRCP srcp(buildReturnedColumn(value, gwi2, gwi2.fatalParseError)); ConstantColumn* constCol = dynamic_cast(srcp.get()); @@ -1114,6 +1119,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c } else if (value->type() == Item::WINDOW_FUNC_ITEM) { + delete colAssignmentListPtr; + delete columnAssignmentPtr; setError(thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_WF_UPDATE)); return ER_CHECK_NOT_IMPLEMENTED; } @@ -1178,6 +1185,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (colAssignmentListPtr->empty() && isUpdateStatement(thd->lex->sql_command)) { ci->affectedRows = 0; + delete colAssignmentListPtr; return 0; } @@ -1205,7 +1213,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(aTableName.table); } - CalpontDMLPackage* pDMLPackage = 0; + std::shared_ptr pDMLPackage; // dmlStmt += ";"; IDEBUG(cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id << endl); VendorDMLStatement dmlStatement(dmlStmt, sessionID); @@ -1215,7 +1223,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c else dmlStatement.set_DMLStatementType(DML_DELETE); - TableName* qualifiedTablName = new TableName(); UpdateSqlStatement updateStmt; //@Bug 2753. To make sure the momory is freed. @@ -1223,10 +1230,11 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (isUpdateStatement(thd->lex->sql_command)) { + TableName* qualifiedTablName = new TableName(); qualifiedTablName->fName = tableName; qualifiedTablName->fSchema = schemaName; updateStmt.fNamePtr = qualifiedTablName; - pDMLPackage = CalpontDMLFactory::makeCalpontUpdatePackageFromMysqlBuffer(dmlStatement, updateStmt); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontUpdatePackageFromMysqlBuffer(dmlStatement, updateStmt)); } else if ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) //@Bug 6121 error out on multi tables delete. { @@ -1246,9 +1254,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(tableName); boost::algorithm::to_lower(aliasName); } - qualifiedTablName->fName = tableName; - qualifiedTablName->fSchema = schemaName; - pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement)); } else { @@ -1271,9 +1277,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(tableName); boost::algorithm::to_lower(aliasName); } - qualifiedTablName->fName = tableName; - qualifiedTablName->fSchema = schemaName; - pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement)); } } else @@ -1288,9 +1292,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c boost::algorithm::to_lower(tableName); boost::algorithm::to_lower(aliasName); } - qualifiedTablName->fName = tableName; - qualifiedTablName->fSchema = schemaName; - pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement); + pDMLPackage.reset(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement)); } if (!pDMLPackage) @@ -1549,7 +1551,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c updateCP->serialize(*plan); pDMLPackage->write(bytestream); - delete pDMLPackage; + pDMLPackage.reset(); ByteStream::byte b = 0; ByteStream::octbyte rows = 0; @@ -1633,12 +1635,12 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c // cout << "doUpdateDelete start new DMLProc client for ctrl-c " << " for session " << sessionID // << endl; VendorDMLStatement cmdStmt("CTRL+C", DML_COMMAND, sessionID); - CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); + std::shared_ptr pDMLPackage(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt)); pDMLPackage->set_TimeZone(timeZoneOffset); ByteStream bytestream; bytestream << static_cast(sessionID); pDMLPackage->write(bytestream); - delete pDMLPackage; + pDMLPackage.reset(); b = 1; retry = maxRetries; errorMsg = "Command canceled by user"; @@ -1756,13 +1758,13 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c if (command != "") { VendorDMLStatement cmdStmt(command, DML_COMMAND, sessionID); - CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); + std::shared_ptr pDMLPackage(CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt)); pDMLPackage->set_TimeZone(timeZoneOffset); pDMLPackage->setTableOid(ci->tableOid); ByteStream bytestream; bytestream << static_cast(sessionID); pDMLPackage->write(bytestream); - delete pDMLPackage; + pDMLPackage.reset(); ByteStream::byte bc; std::string errMsg; @@ -2144,7 +2146,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows, const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - cal_impl_if::gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + cal_impl_if::gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.thd = thd; int rc = 0; @@ -2166,6 +2169,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows, *affected_rows = ci->affectedRows; } + return rc; } @@ -2176,7 +2180,8 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.thd = thd; if (thd->slave_thread && !get_replication_slave(thd) && @@ -2450,9 +2455,9 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector& condStack) ti = ci->tableMap[table]; ti.msTablePtr = table; - if (ti.tpl_ctx == nullptr) + if (!ti.tpl_ctx) { - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); } @@ -2728,7 +2733,6 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) else ci->cal_conn_hndl = hndl; - ti.tpl_ctx = 0; } catch (IDBExcept& e) { @@ -3731,6 +3735,13 @@ int ha_mcs_impl_delete_row(const uchar* buf) return 0; } +// this place is as good as any. +ext_cond_info::ext_cond_info(long timeZone) + : chainHolder(new SubQueryChainHolder()) + , gwi(timeZone, &chainHolder->chain) +{ +} + COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condStack) { THD* thd = current_thd; @@ -3759,7 +3770,8 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.condPush = true; gwi.sessionid = tid2sid(thd->thread_id); cout << "------------------ cond push -----------------------" << endl; @@ -3775,16 +3787,17 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector& condSt const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - ti.condInfo = new gp_walk_info(timeZoneOffset); + ti.condInfo = new ext_cond_info(timeZoneOffset); } - gp_walk_info* gwi = ti.condInfo; + gp_walk_info* gwi = &ti.condInfo->gwi; gwi->dropCond = false; gwi->fatalParseError = false; gwi->condPush = true; gwi->thd = thd; gwi->sessionid = tid2sid(thd->thread_id); cond->traverse_cond(gp_walk, gwi, Item::POSTFIX); + clearDeleteStacks(*gwi); ci->tableMap[table] = ti; if (gwi->fatalParseError) @@ -4145,12 +4158,12 @@ int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table) mapiter = ci->tableMap.find(tl->table); if (mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL && - mapiter->second.condInfo->condPush) + mapiter->second.condInfo->gwi.condPush) { - while (!mapiter->second.condInfo->ptWorkStack.empty()) + while (!mapiter->second.condInfo->gwi.ptWorkStack.empty()) { - ptIt = mapiter->second.condInfo->ptWorkStack.top(); - mapiter->second.condInfo->ptWorkStack.pop(); + ptIt = mapiter->second.condInfo->gwi.ptWorkStack.top(); + mapiter->second.condInfo->gwi.ptWorkStack.pop(); gi.pushedPts.push_back(ptIt); } } @@ -4330,7 +4343,7 @@ int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table) { // MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts. - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_ctx_st.push(ti.tpl_ctx); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx); @@ -4582,6 +4595,7 @@ int ha_mcs_impl_group_by_end(TABLE* table) { bool ask_4_stats = (ci->traceFlags) ? true : false; sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats, clearScanCtx); + ti.tpl_ctx = 0; } // Normaly stats variables are set in external_lock method but we set it here // since they we pretend we are in vtable_disabled mode and the stats vars won't be set. @@ -4597,10 +4611,13 @@ int ha_mcs_impl_group_by_end(TABLE* table) ci->miniStats += hndl->miniStats; } } + else + { + ti.tpl_ctx.reset(); + } ci->cal_conn_hndl = hndl; - ti.tpl_ctx = 0; } catch (IDBExcept& e) { @@ -4679,7 +4696,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool const char* timeZone = thd->variables.time_zone->get_name()->ptr(); long timeZoneOffset; dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); - gp_walk_info gwi(timeZoneOffset); + SubQueryChainHolder chainHolder; + gp_walk_info gwi(timeZoneOffset, &chainHolder.chain); gwi.thd = thd; bool err = false; @@ -5036,7 +5054,7 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool { if (ti.tpl_ctx == 0) { - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); } @@ -5159,7 +5177,7 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone) { if (ti.tpl_ctx == 0) { - ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx.reset(new sm::cpsm_tplh_t()); ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); } diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 7f75c4948..0a05c2035 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -179,7 +179,15 @@ struct gp_walk_info TableOnExprList tableOnExprList; std::vector condList; - gp_walk_info(long timeZone_) + // All SubQuery allocations are single-linked into this chain. + // At the end of gp_walk_info processing we can free whole chain at once. + // This is done so because the juggling of SubQuery pointers in the + // ha_mcs_execplan code. + // There is a struct SubQueryChainHolder down below to hold chain root and free + // the chain using sorta kinda RAII. + SubQuery** subQueriesChain; + + gp_walk_info(long timeZone_, SubQuery** subQueriesChain_) : sessionid(0) , fatalParseError(false) , condPush(false) @@ -204,12 +212,21 @@ struct gp_walk_info , timeZone(timeZone_) , inSubQueryLHS(nullptr) , inSubQueryLHSItem(nullptr) + , subQueriesChain(subQueriesChain_) { } + ~gp_walk_info(); - ~gp_walk_info() - { - } +}; + +struct SubQueryChainHolder; +struct ext_cond_info +{ + // having this as a direct field would introduce + // circular dependency on header inclusion with ha_subquery.h. + boost::shared_ptr chainHolder; + gp_walk_info gwi; + ext_cond_info(long timeZone); // needs knowledge on SubQueryChainHolder, will be defined elsewhere }; struct cal_table_info @@ -223,17 +240,14 @@ struct cal_table_info cal_table_info() : tpl_ctx(0), c(0), msTablePtr(0), conn_hndl(0), condInfo(0), moreRows(false) { } - ~cal_table_info() - { - } - sm::cpsm_tplh_t* tpl_ctx; - std::stack tpl_ctx_st; + sm::sp_cpsm_tplh_t tpl_ctx; + std::stack tpl_ctx_st; sm::sp_cpsm_tplsch_t tpl_scan_ctx; std::stack tpl_scan_ctx_st; unsigned c; // for debug purpose TABLE* msTablePtr; // no ownership sm::cpsm_conhdl_t* conn_hndl; - gp_walk_info* condInfo; + ext_cond_info* condInfo; execplan::SCSEP csep; bool moreRows; // are there more rows to consume (b/c of limit) }; @@ -400,6 +414,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& cse void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg); void gp_walk(const Item* item, void* arg); +void clearDeleteStacks(gp_walk_info& gwi); void parse_item(Item* item, std::vector& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL); const std::string bestTableName(const Item_field* ifp); diff --git a/dbcon/mysql/ha_pseudocolumn.cpp b/dbcon/mysql/ha_pseudocolumn.cpp index 7bd75b6a7..1da0eaa01 100644 --- a/dbcon/mysql/ha_pseudocolumn.cpp +++ b/dbcon/mysql/ha_pseudocolumn.cpp @@ -475,6 +475,7 @@ execplan::ReturnedColumn* buildPseudoColumn(Item* item, gp_walk_info& gwi, bool& funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition(); fc->operationType(idbpartition->operationType(parms, fc->resultType())); fc->alias(ifp->full_name() ? ifp->full_name() : ""); + delete idbpartition; return fc; } diff --git a/dbcon/mysql/ha_scalar_sub.cpp b/dbcon/mysql/ha_scalar_sub.cpp index c1014c9f4..e2afd8367 100644 --- a/dbcon/mysql/ha_scalar_sub.cpp +++ b/dbcon/mysql/ha_scalar_sub.cpp @@ -230,6 +230,7 @@ execplan::ParseTree* ScalarSub::buildParseTree(PredicateOperator* op) { fGwip.fatalParseError = true; fGwip.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_INVALID_OPERATOR_WITH_LIST); + delete op; return NULL; } @@ -244,7 +245,7 @@ execplan::ParseTree* ScalarSub::buildParseTree(PredicateOperator* op) csep->subType(CalpontSelectExecutionPlan::SINGLEROW_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; @@ -270,6 +271,7 @@ execplan::ParseTree* ScalarSub::buildParseTree(PredicateOperator* op) fGwip.parseErrorText = gwi.parseErrorText; } + delete op; return NULL; } diff --git a/dbcon/mysql/ha_select_sub.cpp b/dbcon/mysql/ha_select_sub.cpp index 5a83e245b..3939c11b4 100644 --- a/dbcon/mysql/ha_select_sub.cpp +++ b/dbcon/mysql/ha_select_sub.cpp @@ -67,7 +67,7 @@ SCSEP SelectSubQuery::transform() csep->subType(CalpontSelectExecutionPlan::SELECT_SUBS); // gwi for the sub query - gp_walk_info gwi(fGwip.timeZone); + gp_walk_info gwi(fGwip.timeZone, fGwip.subQueriesChain); gwi.thd = fGwip.thd; gwi.subQuery = this; diff --git a/dbcon/mysql/ha_subquery.h b/dbcon/mysql/ha_subquery.h index 029db0051..8195c520e 100644 --- a/dbcon/mysql/ha_subquery.h +++ b/dbcon/mysql/ha_subquery.h @@ -45,6 +45,8 @@ class SubQuery public: SubQuery(gp_walk_info& gwip) : fGwip(gwip), fCorrelated(false) { + next = *gwip.subQueriesChain; + *gwip.subQueriesChain = this; } virtual ~SubQuery() { @@ -68,11 +70,28 @@ class SubQuery { } + SubQuery* next; protected: gp_walk_info& fGwip; bool fCorrelated; }; +struct SubQueryChainHolder +{ + SubQuery* chain; + SubQueryChainHolder () : chain(nullptr) { } + ~SubQueryChainHolder () + { + while (chain) + { + SubQuery* next = chain->next; + delete chain; + chain = next; + } + } +}; + + /** * @brief A class to represent a generic WHERE clause subquery */ diff --git a/dbcon/mysql/ha_view.cpp b/dbcon/mysql/ha_view.cpp index 4912fd71d..0193d6e75 100644 --- a/dbcon/mysql/ha_view.cpp +++ b/dbcon/mysql/ha_view.cpp @@ -65,7 +65,7 @@ void View::transform() csep->sessionID(fParentGwip->sessionid); // gwi for the sub query - gp_walk_info gwi(fParentGwip->timeZone); + gp_walk_info gwi(fParentGwip->timeZone, fParentGwip->subQueriesChain); gwi.thd = fParentGwip->thd; uint32_t sessionID = csep->sessionID(); @@ -150,6 +150,7 @@ void View::transform() if (gwi.fatalParseError) { setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText); + delete csep; return; } } @@ -157,6 +158,7 @@ void View::transform() { setError(gwi.thd, ER_INTERNAL_ERROR, ie.what()); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + delete csep; return; } catch (...) @@ -164,6 +166,7 @@ void View::transform() string emsg = IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR); setError(gwi.thd, ER_INTERNAL_ERROR, emsg); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + delete csep; return; } @@ -176,6 +179,7 @@ void View::transform() // merge view list to parent fParentGwip->viewList.insert(fParentGwip->viewList.end(), gwi.viewList.begin(), gwi.viewList.end()); + gwi.viewList.clear(); // merge non-collapsed outer join to parent select stack tmpstack; @@ -191,6 +195,12 @@ void View::transform() fParentGwip->ptWorkStack.push(tmpstack.top()); tmpstack.pop(); } + while (!gwi.rcWorkStack.empty()) { + delete gwi.rcWorkStack.top(); + gwi.rcWorkStack.pop(); + } + + delete csep; } uint32_t View::processJoin(gp_walk_info& gwi, std::stack& outerJoinStack) diff --git a/dbcon/mysql/ha_window_function.cpp b/dbcon/mysql/ha_window_function.cpp index a30ba0b71..b3d992077 100644 --- a/dbcon/mysql/ha_window_function.cpp +++ b/dbcon/mysql/ha_window_function.cpp @@ -316,7 +316,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n Item_window_func* wf = (Item_window_func*)item; Item_sum* item_sum = wf->window_func(); string funcName = ConvertFuncName(item_sum); - WindowFunctionColumn* ac = new WindowFunctionColumn(funcName); + std::unique_ptr ac(new WindowFunctionColumn(funcName)); ac->timeZone(gwi.timeZone); ac->distinct(item_sum->has_with_distinct()); Window_spec* win_spec = wf->window_spec; @@ -902,8 +902,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n ac->charsetNumber(item->collation.collation->number); // put ac on windowFuncList - gwi.windowFuncList.push_back(ac); - return ac; + // we clone our managed pointer to put it into uunmanaged world. + WindowFunctionColumn* retAC = ac.release(); + gwi.windowFuncList.push_back(retAC); + return retAC; } } // namespace cal_impl_if diff --git a/dbcon/mysql/is_columnstore_extents.cpp b/dbcon/mysql/is_columnstore_extents.cpp index 0d6b00c3d..c12793cbf 100644 --- a/dbcon/mysql/is_columnstore_extents.cpp +++ b/dbcon/mysql/is_columnstore_extents.cpp @@ -187,7 +187,6 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th if (schema_table_store_record(thd, table)) { - delete emp; return 1; } @@ -197,13 +196,27 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th return 0; } +struct refresher +{ + BRM::DBRM* guarded; + refresher() + { + guarded = new BRM::DBRM(); + } + ~refresher() + { + delete guarded; + BRM::DBRM::refreshShm(); + } +}; static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond) { BRM::OID_t cond_oid = 0; TABLE* table = tables->table; - BRM::DBRM::refreshShm(); - BRM::DBRM* emp = new BRM::DBRM(); + BRM::DBRM* emp; + refresher shmRefresher; + emp = shmRefresher.guarded; if (!emp || !emp->isDBRMReady()) { @@ -289,7 +302,6 @@ static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond) return 1; } - delete emp; return 0; } diff --git a/dbcon/mysql/sm.cpp b/dbcon/mysql/sm.cpp index 098b2a655..caf6f156e 100644 --- a/dbcon/mysql/sm.cpp +++ b/dbcon/mysql/sm.cpp @@ -280,7 +280,7 @@ namespace sm { const std::string DEFAULT_SAVE_PATH = "/var/tmp"; -status_t tpl_open(tableid_t tableid, cpsm_tplh_t* ntplh, cpsm_conhdl_t* conn_hdl) +status_t tpl_open(tableid_t tableid, sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t* conn_hdl) { SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl; @@ -359,7 +359,7 @@ status_t tpl_scan_close(sp_cpsm_tplsch_t& ntplsch) return STATUS_OK; } -status_t tpl_close(cpsm_tplh_t* ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& stats, bool ask_4_stats, +status_t tpl_close(sp_cpsm_tplh_t& ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx) { cpsm_conhdl_t* hndl = *conn_hdl; @@ -369,7 +369,7 @@ status_t tpl_close(cpsm_tplh_t* ntplh, cpsm_conhdl_t** conn_hdl, QueryStats& sta SMDEBUGLOG << " tableid: " << ntplh->tableid; SMDEBUGLOG << endl; - delete ntplh; + ntplh.reset(); // determine end of result set and end of statement execution if (hndl->queryState == QUERY_IN_PROCESS) diff --git a/dbcon/mysql/sm.h b/dbcon/mysql/sm.h index 06ad59ba2..0e055f8e0 100644 --- a/dbcon/mysql/sm.h +++ b/dbcon/mysql/sm.h @@ -137,12 +137,11 @@ struct cpsm_tplsch_t } ~cpsm_tplsch_t() { - delete rowGroup; } tableid_t tableid; uint64_t rowsreturned; - rowgroup::RowGroup* rowGroup; + std::shared_ptr rowGroup; messageqcpp::ByteStream bs; // rowgroup bytestream. need to stay with the life span of rowgroup uint32_t traceFlags; // @bug 649 @@ -158,7 +157,7 @@ struct cpsm_tplsch_t { if (!rowGroup) { - rowGroup = new rowgroup::RowGroup(); + rowGroup.reset(new rowgroup::RowGroup()); rowGroup->deserialize(bs); } else @@ -280,6 +279,7 @@ struct cpsm_tplh_t uint16_t saveFlag; int bandsInTable; }; +typedef std::shared_ptr sp_cpsm_tplh_t; struct cpsm_tid_t { @@ -293,11 +293,11 @@ struct cpsm_tid_t extern status_t sm_init(uint32_t, cpsm_conhdl_t**, uint32_t columnstore_local_query = false); extern status_t sm_cleanup(cpsm_conhdl_t*); -extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*); +extern status_t tpl_open(tableid_t, sp_cpsm_tplh_t&, cpsm_conhdl_t*); extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); -extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, +extern status_t tpl_close(sp_cpsm_tplh_t&, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx = false); } // namespace sm diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b7632f301..549f92a7c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -33,10 +33,9 @@ if (WITH_UNITTESTS) target_link_libraries(rowgroup_tests ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS}) gtest_add_tests(TARGET rowgroup_tests TEST_PREFIX columnstore:) - add_executable(rewritetest rewritetest.cpp) add_dependencies(rewritetest googletest) - target_link_libraries(rewritetest ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS} messageqcpp execplan) + target_link_libraries(rewritetest ${ENGINE_LDFLAGS} ${GTEST_LIBRARIES} ${ENGINE_EXEC_LIBS}) gtest_add_tests(TARGET rewritetest TEST_PREFIX columnstore:) add_executable(mcs_decimal_tests mcs_decimal-tests.cpp) diff --git a/tests/scripts/fullmtr.sh b/tests/scripts/fullmtr.sh index 53f5ef20f..f3b16b570 100644 --- a/tests/scripts/fullmtr.sh +++ b/tests/scripts/fullmtr.sh @@ -7,6 +7,9 @@ CURRENT_DIR=`pwd` mysql -e "create database if not exists test;" SOCKET=`mysql -e "show variables like 'socket';" | grep socket | cut -f2` +export ASAN_OPTIONS=abort_on_error=1:disable_coredump=0,print_stats=false,detect_odr_violation=0,check_initialization_order=1,detect_stack_use_after_return=1,atexit=false,log_path=/core/asan.hz + + cd ${INSTALLED_MTR_PATH} if [[ ! -d ${COLUMSNTORE_MTR_INSTALLED} ]]; then @@ -21,7 +24,15 @@ if [[ ! -d '/data/qa/source/dbt3/' || ! -d '/data/qa/source/ssb/' ]]; then fi run_suite() { - ./mtr --force --max-test-fail=0 --testcase-timeout=60 --extern socket=$SOCKET --suite=columnstore/$1 $2 | tee $CURRENT_DIR/mtr.$1.log 2>&1 + ls /core >$CURRENT_DIR/mtr.$1.cores-before + ./mtr --force --max-test-fail=0 --testcase-timeout=60 --suite=columnstore/$1 $2 | tee $CURRENT_DIR/mtr.$1.log 2>&1 + # dump analyses. + systemctl stop mariadb + systemctl start mariadb + ls /core >$CURRENT_DIR/mtr.$1.cores-after + echo "reports or coredumps:" + diff -u $CURRENT_DIR/mtr.$1.cores-before $CURRENT_DIR/mtr.$1.cores-after && echo "no new reports or coredumps" + rm $CURRENT_DIR/mtr.$1.cores-before $CURRENT_DIR/mtr.$1.cores-after } if (( $# == 2 )); then @@ -29,8 +40,9 @@ if (( $# == 2 )); then exit 1 fi -run_suite setup +run_suite basic run_suite bugfixes +run_suite setup run_suite devregression run_suite autopilot run_suite extended diff --git a/versioning/BRM/brmshmimpl.cpp b/versioning/BRM/brmshmimpl.cpp index 82bef3b7c..f901be3fc 100644 --- a/versioning/BRM/brmshmimpl.cpp +++ b/versioning/BRM/brmshmimpl.cpp @@ -422,6 +422,7 @@ BRMManagedShmImplRBTree::BRMManagedShmImplRBTree(unsigned key, off_t size, bool BRMManagedShmImplRBTree::~BRMManagedShmImplRBTree() { + delete fShmSegment; } void BRMManagedShmImplRBTree::setReadOnly() diff --git a/writeengine/bulk/we_bulkload.cpp b/writeengine/bulk/we_bulkload.cpp index 28991ff95..56aacfafe 100644 --- a/writeengine/bulk/we_bulkload.cpp +++ b/writeengine/bulk/we_bulkload.cpp @@ -70,7 +70,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix // extern WriteEngine::BRMWrapper* brmWrapperPtr; namespace WriteEngine { -/* static */ boost::ptr_vector BulkLoad::fTableInfo; +/* static */ std::vector> BulkLoad::fTableInfo; /* static */ boost::mutex* BulkLoad::fDDLMutex = 0; /* static */ const std::string BulkLoad::DIR_BULK_JOB("job"); @@ -519,7 +519,7 @@ void BulkLoad::spawnWorkers() // NO_ERROR if success // other if fail //------------------------------------------------------------------------------ -int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) +int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr& tableInfo) { int rc = NO_ERROR, minWidth = 9999; // give a big number HWM minHWM = 999999; // rp 9/25/07 Bug 473 @@ -701,7 +701,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) << "Table-" << job.jobTableList[tableNo].tblName << "..."; fLog.logMsg(oss11.str(), MSGLVL_INFO2); - rc = saveBulkRollbackMetaData(job, tableInfo, segFileInfo, dbRootHWMInfoColVec); + rc = saveBulkRollbackMetaData(job, tableInfo.get(), segFileInfo, dbRootHWMInfoColVec); if (rc != NO_ERROR) { @@ -733,10 +733,10 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) if (job.jobTableList[tableNo].colList[i].compressionType) info = new ColumnInfoCompressed(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, - tableInfo); + tableInfo.get()); // tableInfo->rbMetaWriter()); else - info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo); + info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo.get()); if (pwd) info->setUIDGID(pwd->pw_uid, pwd->pw_gid); @@ -840,7 +840,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo) if (rc) return rc; - fTableInfo.push_back(tableInfo); + fTableInfo.push_back(std::shared_ptr(tableInfo)); return NO_ERROR; } @@ -1039,19 +1039,19 @@ int BulkLoad::processJob() //-------------------------------------------------------------------------- // Validate the existence of the import data files //-------------------------------------------------------------------------- - std::vector tables; + std::vector> tables; for (i = 0; i < curJob.jobTableList.size(); i++) { - TableInfo* tableInfo = new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid, - curJob.jobTableList[i].tblName, fKeepRbMetaFiles); + std::shared_ptr tableInfo(new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid, + curJob.jobTableList[i].tblName, fKeepRbMetaFiles)); if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)) tableInfo->setBulkLoadMode(fBulkMode, fBRMRptFileName); tableInfo->setErrorDir(string(getErrorDir())); tableInfo->setTruncationAsError(getTruncationAsError()); - rc = manageImportDataFileList(curJob, i, tableInfo); + rc = manageImportDataFileList(curJob, i, tableInfo.get()); if (rc != NO_ERROR) { @@ -1495,7 +1495,7 @@ int BulkLoad::rollbackLockedTables() for (unsigned i = 0; i < fTableInfo.size(); i++) { - if (fTableInfo[i].isTableLocked()) + if (fTableInfo[i]->isTableLocked()) { lockedTableFound = true; break; @@ -1509,10 +1509,10 @@ int BulkLoad::rollbackLockedTables() // Report the tables that were successfully loaded for (unsigned i = 0; i < fTableInfo.size(); i++) { - if (!fTableInfo[i].isTableLocked()) + if (!fTableInfo[i]->isTableLocked()) { ostringstream oss; - oss << "Table " << fTableInfo[i].getTableName() << " was successfully loaded. "; + oss << "Table " << fTableInfo[i]->getTableName() << " was successfully loaded. "; fLog.logMsg(oss.str(), MSGLVL_INFO1); } } @@ -1520,24 +1520,24 @@ int BulkLoad::rollbackLockedTables() // Report the tables that were not successfully loaded for (unsigned i = 0; i < fTableInfo.size(); i++) { - if (fTableInfo[i].isTableLocked()) + if (fTableInfo[i]->isTableLocked()) { - if (fTableInfo[i].hasProcessingBegun()) + if (fTableInfo[i]->hasProcessingBegun()) { ostringstream oss; - oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")" + oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")" << " was not successfully loaded. Rolling back."; fLog.logMsg(oss.str(), MSGLVL_INFO1); } else { ostringstream oss; - oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")" + oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")" << " did not start loading. No rollback necessary."; fLog.logMsg(oss.str(), MSGLVL_INFO1); } - rc = rollbackLockedTable(fTableInfo[i]); + rc = rollbackLockedTable(*fTableInfo[i]); if (rc != NO_ERROR) { @@ -1603,9 +1603,9 @@ bool BulkLoad::addErrorMsg2BrmUpdater(const std::string& tablename, const ostrin for (int tableId = 0; tableId < size; tableId++) { - if (fTableInfo[tableId].getTableName() == tablename) + if (fTableInfo[tableId]->getTableName() == tablename) { - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); return true; } } diff --git a/writeengine/bulk/we_bulkload.h b/writeengine/bulk/we_bulkload.h index 39841b23a..926ff6c93 100644 --- a/writeengine/bulk/we_bulkload.h +++ b/writeengine/bulk/we_bulkload.h @@ -77,7 +77,7 @@ class BulkLoad : public FileOp /** * @brief Pre process jobs to validate and assign values to the job structure */ - int preProcess(Job& job, int tableNo, TableInfo* tableInfo); + int preProcess(Job& job, int tableNo, std::shared_ptr& tableInfo); /** * @brief Print job information @@ -194,7 +194,7 @@ class BulkLoad : public FileOp std::string fAlternateImportDir; // Alternate bulk import directory std::string fErrorDir; // Opt. where error records record std::string fProcessName; // Application process name - static boost::ptr_vector fTableInfo; // Vector of Table information + static std::vector> fTableInfo; // Vector of Table information int fNoOfParseThreads; // Number of parse threads int fNoOfReadThreads; // Number of read threads boost::thread_group fReadThreads; // Read thread group diff --git a/writeengine/bulk/we_extentstripealloc.cpp b/writeengine/bulk/we_extentstripealloc.cpp index 8482cd834..98463e1b3 100644 --- a/writeengine/bulk/we_extentstripealloc.cpp +++ b/writeengine/bulk/we_extentstripealloc.cpp @@ -140,7 +140,7 @@ int ExtentStripeAlloc::allocateExtent(OID oid, uint16_t dbRoot, startLbid = extentEntryIter->second.fStartLbid; allocSize = extentEntryIter->second.fAllocSize; hwm = extentEntryIter->second.fHwm; - errMsg = extentEntryIter->second.fStatusMsg; + errMsg = *extentEntryIter->second.fStatusMsg; retStatus = extentEntryIter->second.fStatus; fMap.erase(extentEntryIter); @@ -274,7 +274,7 @@ void ExtentStripeAlloc::print() << "; seg: " << iter->second.fSegNum << "; lbid: " << iter->second.fStartLbid << "; size: " << iter->second.fAllocSize << "; hwm: " << iter->second.fHwm << "; stripe: " << iter->second.fStripeKey << "; stat: " << iter->second.fStatus - << "; msg: " << iter->second.fStatusMsg; + << "; msg: " << *iter->second.fStatusMsg; } } else diff --git a/writeengine/bulk/we_extentstripealloc.h b/writeengine/bulk/we_extentstripealloc.h index 7d30d4dbe..acdee123e 100644 --- a/writeengine/bulk/we_extentstripealloc.h +++ b/writeengine/bulk/we_extentstripealloc.h @@ -46,21 +46,6 @@ class Log; class AllocExtEntry { public: - // Default constructor - AllocExtEntry() - : fOid(0) - , fColWidth(0) - , fDbRoot(0) - , fPartNum(0) - , fSegNum(0) - , fStartLbid(0) - , fAllocSize(0) - , fHwm(0) - , fStatus(NO_ERROR) - , fStripeKey(0) - { - } - // Used to create entry for an existing extent we are going to add data to. AllocExtEntry(OID& oid, int colWidth, uint16_t dbRoot, uint32_t partNum, uint16_t segNum, BRM::LBID_t startLbid, int allocSize, HWM hwm, int status, const std::string& statusMsg, @@ -74,22 +59,22 @@ class AllocExtEntry , fAllocSize(allocSize) , fHwm(hwm) , fStatus(status) - , fStatusMsg(statusMsg) + , fStatusMsg(new std::string(statusMsg)) , fStripeKey(stripeKey) { } - OID fOid; // column OID - int fColWidth; // colum width (in bytes) - uint16_t fDbRoot; // DBRoot of allocated extent - uint32_t fPartNum; // Partition number of allocated extent - uint16_t fSegNum; // Segment number of allocated extent - BRM::LBID_t fStartLbid; // Starting LBID of allocated extent - int fAllocSize; // Number of allocated LBIDS - HWM fHwm; // Starting fbo or hwm of allocated extent - int fStatus; // Status of extent allocation - std::string fStatusMsg; // Status msg of extent allocation - unsigned int fStripeKey; // "Stripe" identifier for this extent + OID fOid = 0; // column OID + int fColWidth = 0; // colum width (in bytes) + uint16_t fDbRoot = 0; // DBRoot of allocated extent + uint32_t fPartNum = 0; // Partition number of allocated extent + uint16_t fSegNum = 0; // Segment number of allocated extent + BRM::LBID_t fStartLbid = 0; // Starting LBID of allocated extent + int fAllocSize = 0; // Number of allocated LBIDS + HWM fHwm = 0; // Starting fbo or hwm of allocated extent + int fStatus = NO_ERROR; // Status of extent allocation + std::shared_ptr fStatusMsg{new std::string()}; // Status msg of extent allocation + unsigned int fStripeKey = 0; // "Stripe" identifier for this extent }; //------------------------------------------------------------------------------ diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 011c70b5c..6b23fa26c 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -178,7 +178,6 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN TableInfo::~TableInfo() { fBRMReporter.sendErrMsgToFile(fBRMRptFileName); - freeProcessingBuffers(); } //------------------------------------------------------------------------------ diff --git a/writeengine/bulk/we_workers.cpp b/writeengine/bulk/we_workers.cpp index dcd4f1d54..26870e88d 100644 --- a/writeengine/bulk/we_workers.cpp +++ b/writeengine/bulk/we_workers.cpp @@ -95,16 +95,16 @@ void BulkLoad::read(int id) #ifdef PROFILE Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL); #endif - int rc = fTableInfo[tableId].readTableData(); + int rc = fTableInfo[tableId]->readTableData(); if (rc != NO_ERROR) { // Error occurred while reading the data, break out of loop. BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; - oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); break; } @@ -117,7 +117,7 @@ void BulkLoad::read(int id) if (tableId != -1) oss << "Bulkload Read (thread " << id << ") Stopped reading Table " - << fTableInfo[tableId].getTableName() << ". " << ex.what(); + << fTableInfo[tableId]->getTableName() << ". " << ex.what(); else oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what(); @@ -129,14 +129,14 @@ void BulkLoad::read(int id) ostringstream oss; if (tableId != -1) - oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what() << ". Terminating this job."; else oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what() << ". Terminating this job."; if (tableId != -1) - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } @@ -146,13 +146,13 @@ void BulkLoad::read(int id) ostringstream oss; if (tableId != -1) - oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; else oss << "Bulkload Read (thread " << id << ") Failed for Table. Terminating this job."; if (tableId != -1) - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } @@ -170,7 +170,7 @@ int BulkLoad::lockTableForRead(int id) for (unsigned i = 0; i < fTableInfo.size(); ++i) { - if (fTableInfo[i].lockForRead(id)) + if (fTableInfo[i]->lockForRead(id)) return i; } @@ -292,16 +292,16 @@ void BulkLoad::parse(int id) // Have obtained the table and column for parsing. // Start parsing the column data. double processingTime; - int rc = fTableInfo[tableId].parseColumn(columnId, myParseBuffer, processingTime); + int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime); if (rc != NO_ERROR) { // Error occurred while parsing the data, break out of loop. BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; - oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << " during parsing. Terminating this job."; - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); setParseErrorOnTable(tableId, true); @@ -310,7 +310,7 @@ void BulkLoad::parse(int id) // Parsing is complete. Acquire the mutex and increment // the parsingComplete value for the buffer - if (fTableInfo[tableId].getStatusTI() != WriteEngine::ERR) + if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE); @@ -320,15 +320,15 @@ void BulkLoad::parse(int id) Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE); Stats::startParseEvent(WE_STATS_COMPLETING_PARSE); #endif - rc = fTableInfo[tableId].setParseComplete(columnId, myParseBuffer, processingTime); + rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime); if (rc != NO_ERROR) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; oss << "Bulkload Parse (thread " << id << ") Failed for Table " - << fTableInfo[tableId].getTableName() << " during parse completion. Terminating this job."; - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + << fTableInfo[tableId]->getTableName() << " during parse completion. Terminating this job."; + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); setParseErrorOnTable(tableId, false); @@ -349,7 +349,7 @@ void BulkLoad::parse(int id) if (tableId != -1) { oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table " - << fTableInfo[tableId].getTableName() << ". " << ex.what(); + << fTableInfo[tableId]->getTableName() << ". " << ex.what(); setParseErrorOnTable(tableId, true); } @@ -367,11 +367,11 @@ void BulkLoad::parse(int id) if (tableId != -1) { - oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what() << ". Terminating this job."; setParseErrorOnTable(tableId, true); - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); } else { @@ -388,11 +388,11 @@ void BulkLoad::parse(int id) if (tableId != -1) { - oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName() + oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; setParseErrorOnTable(tableId, true); - fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str()); + fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); } else { @@ -421,10 +421,10 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& for (unsigned i = 0; i < fTableInfo.size(); ++i) { - if (fTableInfo[i].getStatusTI() == WriteEngine::PARSE_COMPLETE) + if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE) continue; - int currentParseBuffer = fTableInfo[i].getCurrentParseBuffer(); + int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer(); myParseBuffer = currentParseBuffer; do @@ -434,7 +434,7 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& { ostringstream oss; std::string bufStatusStr; - Status stat = fTableInfo[i].getStatusTI(); + Status stat = fTableInfo[i]->getStatusTI(); ColumnInfo::convertStatusToString(stat, bufStatusStr); oss << " - " << pthread_self() << ":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")"; @@ -452,13 +452,13 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& // @bug2099- // get a buffer and column to parse if available. - if ((columnId = fTableInfo[i].getColumnForParse(thrdId, myParseBuffer, report)) != -1) + if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1) { tableId = i; return true; } - myParseBuffer = (myParseBuffer + 1) % fTableInfo[i].getNumberOfBuffers(); + myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers(); } while (myParseBuffer != currentParseBuffer); } @@ -479,10 +479,10 @@ bool BulkLoad::allTablesDone(Status status) { for (unsigned i = 0; i < fTableInfo.size(); ++i) { - if (fTableInfo[i].getStatusTI() == WriteEngine::ERR) + if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR) return true; - if (fTableInfo[i].getStatusTI() != status) + if (fTableInfo[i]->getStatusTI() != status) return false; } @@ -499,11 +499,11 @@ void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex) if (lockParseMutex) { boost::mutex::scoped_lock lock(fParseMutex); - fTableInfo[tableId].setParseError(); + fTableInfo[tableId]->setParseError(); } else { - fTableInfo[tableId].setParseError(); + fTableInfo[tableId]->setParseError(); } } diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index e065630b5..ffc783c78 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -57,6 +57,7 @@ using namespace execplan; namespace WriteEngine { BRMWrapper* volatile BRMWrapper::m_instance = NULL; +std::atomic BRMWrapper::finishReported(false); boost::thread_specific_ptr BRMWrapper::m_ThreadDataPtr; boost::mutex BRMWrapper::m_instanceCreateMutex; @@ -750,6 +751,10 @@ uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId) void BRMWrapper::finishCpimportJob(uint32_t jobId) { + if (finishReported.exchange(true)) // get old and set to true; if old is true, do nothing. + { + return; + } blockRsltnMgrPtr->finishCpimportJob(jobId); } diff --git a/writeengine/shared/we_brm.h b/writeengine/shared/we_brm.h index d8bc403ae..e091e38ae 100644 --- a/writeengine/shared/we_brm.h +++ b/writeengine/shared/we_brm.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include @@ -474,6 +475,8 @@ class BRMWrapper : public WEObj static IDBDataFile* m_curVBFile; BRM::DBRM* blockRsltnMgrPtr; + + EXPORT static std::atomic finishReported; }; //------------------------------------------------------------------------------