diff --git a/dbcon/dmlpackage/dml.y b/dbcon/dmlpackage/dml.y index b26319c0b..73dcf26ea 100644 --- a/dbcon/dmlpackage/dml.y +++ b/dbcon/dmlpackage/dml.y @@ -566,17 +566,11 @@ assignment_commalist: assignment: column COMPARISON scalar_exp { - $$ = new ColumnAssignment(); - $$->fColumn = $1; - $$->fOperator = $2; - $$->fScalarExpression = $3; + $$ = new ColumnAssignment($1, $2, $3); } | column COMPARISON NULLX { - $$ = new ColumnAssignment(); - $$->fColumn = $1; - $$->fOperator = $2; - $$->fScalarExpression = $3; + $$ = new ColumnAssignment($1, $2, $3); } ; diff --git a/dbcon/dmlpackage/dmlpkg.h b/dbcon/dmlpackage/dmlpkg.h index 477e0f608..f7377f8a3 100644 --- a/dbcon/dmlpackage/dmlpkg.h +++ b/dbcon/dmlpackage/dmlpkg.h @@ -409,6 +409,14 @@ public: class ColumnAssignment { public: + explicit ColumnAssignment( + std::string const& column, + std::string const& op = "=", + std::string const& expr = "") : + fColumn(column), fOperator(op), fScalarExpression(expr), + fFromCol(false), fFuncScale(0), fIsNull(false) + {}; + /** @brief dump to stdout */ std::ostream& put(std::ostream& os) const; @@ -423,6 +431,7 @@ public: std::string fScalarExpression; bool fFromCol; uint32_t fFuncScale; + bool fIsNull; }; /** @brief Stores a value list or a query specification diff --git a/dbcon/dmlpackage/updatedmlpackage.cpp b/dbcon/dmlpackage/updatedmlpackage.cpp index a68e42fba..23b42a8dd 100644 --- a/dbcon/dmlpackage/updatedmlpackage.cpp +++ b/dbcon/dmlpackage/updatedmlpackage.cpp @@ -259,7 +259,8 @@ void UpdateDMLPackage::buildUpdateFromMysqlBuffer(UpdateSqlStatement& updateStm while (iter != updateStmt.fColAssignmentListPtr->end()) { ColumnAssignment* colaPtr = *iter; - DMLColumn* colPtr = new DMLColumn(colaPtr->fColumn, colaPtr->fScalarExpression, colaPtr->fFromCol, colaPtr->fFuncScale); + DMLColumn* colPtr = new DMLColumn(colaPtr->fColumn, colaPtr->fScalarExpression, colaPtr->fFromCol, colaPtr->fFuncScale, + colaPtr->fIsNull); rowPtr->get_ColumnList().push_back(colPtr); ++iter; diff --git a/dbcon/execplan/arithmeticcolumn.cpp b/dbcon/execplan/arithmeticcolumn.cpp index aab6c9265..2128a47ed 100644 --- a/dbcon/execplan/arithmeticcolumn.cpp +++ b/dbcon/execplan/arithmeticcolumn.cpp @@ -326,7 +326,8 @@ void ArithmeticColumn::serialize(messageqcpp::ByteStream& b) const ObjectReader::writeParseTree(fExpression, b); b << fTableAlias; b << fData; - b << static_cast(fAsc); + const ByteStream::doublebyte tmp = fAsc; + b << tmp; } void ArithmeticColumn::unserialize(messageqcpp::ByteStream& b) @@ -340,7 +341,9 @@ void ArithmeticColumn::unserialize(messageqcpp::ByteStream& b) fExpression = ObjectReader::createParseTree(b); b >> fTableAlias; b >> fData; - b >> reinterpret_cast< ByteStream::doublebyte&>(fAsc); + ByteStream::doublebyte tmp; + b >> tmp; + fAsc = (tmp); fSimpleColumnList.clear(); fExpression->walk(getSimpleCols, &fSimpleColumnList); diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index 3e4eedcbf..e21cd96bb 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -548,6 +548,10 @@ public: return fDerivedTbAlias; } + void derivedTbView(const std::string derivedTbView) { fDerivedTbView = derivedTbView; } + const std::string derivedTbView() const { return fDerivedTbView; } + + void limitStart(const uint64_t limitStart) { fLimitStart = limitStart; @@ -870,6 +874,7 @@ private: // for subselect uint64_t fSubType; std::string fDerivedTbAlias; + std::string fDerivedTbView; // for limit uint64_t fLimitStart; diff --git a/dbcon/execplan/functioncolumn.cpp b/dbcon/execplan/functioncolumn.cpp index c815a3cb2..03ef4ab39 100644 --- a/dbcon/execplan/functioncolumn.cpp +++ b/dbcon/execplan/functioncolumn.cpp @@ -426,6 +426,16 @@ void FunctionColumn::setDerivedTable() break; } } + // MCOL-3239 Block for func column with both + // derived table column and normal table column. + else if (derivedTableAlias == "") + { + if (sc->tableAlias().length()) + { + derivedTableAlias = ""; + break; + } + } } fDerivedTable = derivedTableAlias; diff --git a/dbcon/execplan/predicateoperator.h b/dbcon/execplan/predicateoperator.h index d7f650b9a..02a38efe4 100644 --- a/dbcon/execplan/predicateoperator.h +++ b/dbcon/execplan/predicateoperator.h @@ -36,7 +36,6 @@ #endif #include #include -#include #include "expressionparser.h" #include "returnedcolumn.h" @@ -486,20 +485,16 @@ inline bool PredicateOperator::getBoolVal(rowgroup::Row& row, bool& isNull, Retu return !ret; } - // MCOL-1559 - std::string val1 = lop->getStrVal(row, isNull); if (isNull) return false; - std::string val2 = rop->getStrVal(row, isNull); + const std::string& val1 = lop->getStrVal(row, isNull); + if (isNull) return false; - boost::trim_right_if(val1, boost::is_any_of(" ")); - boost::trim_right_if(val2, boost::is_any_of(" ")); - - return strCompare(val1, val2); - } + return strCompare(val1, rop->getStrVal(row, isNull)) && !isNull; + } //FIXME: ??? case execplan::CalpontSystemCatalog::VARBINARY: diff --git a/dbcon/execplan/simplecolumn.cpp b/dbcon/execplan/simplecolumn.cpp index 169b588fb..38ca76fde 100644 --- a/dbcon/execplan/simplecolumn.cpp +++ b/dbcon/execplan/simplecolumn.cpp @@ -212,9 +212,9 @@ const string SimpleColumn::data() const if (!fData.empty()) return fData; else if (!fTableAlias.empty()) - return string(fSchemaName + '.' + fTableAlias + '.' + fColumnName); + return string("`" + fSchemaName + "`.`" + fTableAlias + "`.`" + fColumnName + "`"); - return string(fSchemaName + '.' + fTableName + '.' + fColumnName); + return string("`" + fSchemaName + "`.`" + fTableName + "`.`" + fColumnName + "`"); } SimpleColumn& SimpleColumn::operator=(const SimpleColumn& rhs) diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 07ab611e7..bf44e1a6e 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -731,17 +731,17 @@ void CrossEngineStep::setProjectBPP(JobStep* jobStep1, JobStep*) else fSelectClause += "SELECT "; - fSelectClause += jobStep1->name(); + fSelectClause += "`" + jobStep1->name() + "`"; } std::string CrossEngineStep::makeQuery() { ostringstream oss; - oss << fSelectClause << " FROM " << fTable; + oss << fSelectClause << " FROM `" << fTable << "`"; if (fTable.compare(fAlias) != 0) - oss << " " << fAlias; + oss << " `" << fAlias << "`"; if (!fWhereClause.empty()) oss << fWhereClause; diff --git a/dbcon/joblist/jlf_common.cpp b/dbcon/joblist/jlf_common.cpp index 2598b1aa7..5b07e21e1 100644 --- a/dbcon/joblist/jlf_common.cpp +++ b/dbcon/joblist/jlf_common.cpp @@ -386,19 +386,57 @@ CalpontSystemCatalog::OID tableOid(const SimpleColumn* sc, boost::shared_ptr(sc); uint32_t pseudoType = (pc) ? pc->pseudoType() : execplan::PSEUDO_UNKNOWN; - return getTupleKey_(jobInfo, sc->oid(), sc->columnName(), extractTableAlias(sc), - sc->schemaName(), sc->viewName(), - ((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0), - pseudoType, (sc->isInfiniDB() ? 0 : 1)); + if (sc == NULL) + { + return -1; + } + + if (add) + { + // setTupleInfo first if add is true, ok if already set. + if (sc->schemaName().empty()) + { + SimpleColumn tmp(*sc, jobInfo.sessionId); + tmp.oid(tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); + key = getTupleKey(jobInfo, &tmp); // sub-query should be there + } + else + { + CalpontSystemCatalog::ColType ct = sc->colType(); + string alias(extractTableAlias(sc)); + CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); + TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc, alias)); + key = ti.key; + + CalpontSystemCatalog::OID dictOid = isDictCol(ct); + + if (dictOid > 0) + { + ti = setTupleInfo(ct, dictOid, jobInfo, tblOid, sc, alias); + jobInfo.keyInfo->dictKeyMap[key] = ti.key; + key = ti.key; + } + } + } + else + { + // TupleInfo is expected to be set already + return getTupleKey_(jobInfo, sc->oid(), sc->columnName(), extractTableAlias(sc), + sc->schemaName(), sc->viewName(), + ((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0), + pseudoType, (sc->isInfiniDB() ? 0 : 1)); + } + + return key; } - - + uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add) { int key = -1; @@ -608,7 +646,7 @@ uint32_t getExpTupleKey(const JobInfo& jobInfo, uint64_t eid, bool cr) } -void addAggregateColumn(AggregateColumn* agc, int idx, RetColsVector& vec, JobInfo& jobInfo) +void addAggregateColumn(ReturnedColumn* agc, int idx, RetColsVector& vec, JobInfo& jobInfo) { uint32_t eid = agc->expressionId(); setExpTupleInfo(agc->resultType(), eid, agc->alias(), jobInfo); diff --git a/dbcon/joblist/jlf_common.h b/dbcon/joblist/jlf_common.h index 48cdaad5b..6d8456a4b 100644 --- a/dbcon/joblist/jlf_common.h +++ b/dbcon/joblist/jlf_common.h @@ -402,8 +402,9 @@ execplan::CalpontSystemCatalog::OID tableOid(const execplan::SimpleColumn* sc, /** @brief Returns the unique ID to be used in tupleInfo * */ -uint32_t getTupleKey(const JobInfo& jobInfo, - const execplan::SimpleColumn* sc); +uint32_t getTupleKey(JobInfo& jobInfo, + const execplan::SimpleColumn* sc, + bool add = false); uint32_t getTableKey(const JobInfo& jobInfo, execplan::CalpontSystemCatalog::OID tableOid, const std::string& alias, @@ -465,7 +466,7 @@ TupleInfo setExpTupleInfo(const execplan::ReturnedColumn* rc, JobInfo& jobInfo); /** @brief add an aggregate column info * */ -void addAggregateColumn(execplan::AggregateColumn*, int, RetColsVector&, JobInfo&); +void addAggregateColumn(execplan::ReturnedColumn*, int, RetColsVector&, JobInfo&); void makeJobSteps(execplan::CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobStepVector& querySteps, JobStepVector& projectSteps, diff --git a/dbcon/joblist/jlf_execplantojoblist.cpp b/dbcon/joblist/jlf_execplantojoblist.cpp index 54cfaf726..5049fde79 100644 --- a/dbcon/joblist/jlf_execplantojoblist.cpp +++ b/dbcon/joblist/jlf_execplantojoblist.cpp @@ -1644,6 +1644,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo) string constval(cc->constval()); + CalpontSystemCatalog::OID dictOid = 0; CalpontSystemCatalog::ColType ct = sc->colType(); const PseudoColumn* pc = dynamic_cast(sc); diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index a0f575f01..9af171dea 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -275,9 +275,11 @@ const JobStepVector doProject(const RetColsVector& retCols, JobInfo& jobInfo) if (retCols[i]->windowfunctionColumnList().size() > 0) jobInfo.expressionVec.push_back(key); - else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) == - jobInfo.expressionVec.end()) + else if (find(jobInfo.expressionVec.begin(), jobInfo.expressionVec.end(), key) + == jobInfo.expressionVec.end()) + { jobInfo.returnedExpressions.push_back(sjstep); + } //put place hold column in projection list jobInfo.pjColList.push_back(ti); @@ -427,7 +429,7 @@ void preProcessFunctionOnAggregation(const vector& scs, } } - // append the aggregate columns in arithmetic/function cloulmn to the projection list + // append the aggregate columns in arithmetic/function column to the projection list for (vector::const_iterator i = aggs.begin(); i != aggs.end(); i++) { addAggregateColumn(*i, -1, jobInfo.projectionCols, jobInfo); @@ -1043,16 +1045,21 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo const FunctionColumn* fc = NULL; const WindowFunctionColumn* wc = NULL; bool hasAggCols = false; + bool hasWndCols = false; if ((ac = dynamic_cast(srcp.get())) != NULL) { if (ac->aggColumnList().size() > 0) hasAggCols = true; + if (ac->windowfunctionColumnList().size() > 0) + hasWndCols = true; } else if ((fc = dynamic_cast(srcp.get())) != NULL) { if (fc->aggColumnList().size() > 0) hasAggCols = true; + if (fc->windowfunctionColumnList().size() > 0) + hasWndCols = true; } else if (dynamic_cast(srcp.get()) != NULL) { @@ -1077,7 +1084,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo)); tupleKey = ti.key; - if (hasAggCols) + if (hasAggCols && !hasWndCols) jobInfo.expressionVec.push_back(tupleKey); } @@ -1191,16 +1198,21 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo const FunctionColumn* fc = NULL; const WindowFunctionColumn* wc = NULL; bool hasAggCols = false; + bool hasWndCols = false; if ((ac = dynamic_cast(srcp.get())) != NULL) { if (ac->aggColumnList().size() > 0) hasAggCols = true; + if (ac->windowfunctionColumnList().size() > 0) + hasWndCols = true; } else if ((fc = dynamic_cast(srcp.get())) != NULL) { if (fc->aggColumnList().size() > 0) hasAggCols = true; + if (fc->windowfunctionColumnList().size() > 0) + hasWndCols = true; } else if (dynamic_cast(srcp.get()) != NULL) { @@ -1225,7 +1237,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo TupleInfo ti(setExpTupleInfo(ct, eid, srcp.get()->alias(), jobInfo)); tupleKey = ti.key; - if (hasAggCols) + if (hasAggCols && !hasWndCols) jobInfo.expressionVec.push_back(tupleKey); } diff --git a/dbcon/joblist/lbidlist.cpp b/dbcon/joblist/lbidlist.cpp index 9ef9c8a11..631e9a597 100644 --- a/dbcon/joblist/lbidlist.cpp +++ b/dbcon/joblist/lbidlist.cpp @@ -750,7 +750,6 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min, int64_t tMax = Max; dataconvert::DataConvert::trimWhitespace(tMin); dataconvert::DataConvert::trimWhitespace(tMax); - dataconvert::DataConvert::trimWhitespace(value); scan = compareVal(order_swap(tMin), order_swap(tMax), order_swap(value), op, lcf); diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index 548eecad1..81d190644 100644 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -1410,7 +1410,7 @@ void TupleAggregateStep::prep1PhaseAggregate( typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); precisionAgg.push_back(-1); widthAgg.push_back(sizeof(long double)); - scaleAgg.push_back(scaleProj[colProj]); + scaleAgg.push_back(0); } break; @@ -1588,8 +1588,8 @@ void TupleAggregateStep::prep1PhaseAggregate( } functionVec[i]->fAuxColumnIndex = lastCol++; - oidsAgg.push_back(oidsAgg[j]); - keysAgg.push_back(keysAgg[j]); + oidsAgg.push_back(oidsProj[j]); + keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); precisionAgg.push_back(0); precisionAgg.push_back(0); @@ -1604,8 +1604,8 @@ void TupleAggregateStep::prep1PhaseAggregate( functionVec[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAgg.push_back(oidsAgg[j]); - keysAgg.push_back(keysAgg[j]); + oidsAgg.push_back(oidsProj[j]); + keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); precisionAgg.push_back(-1); typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -1613,8 +1613,8 @@ void TupleAggregateStep::prep1PhaseAggregate( ++lastCol; // sum(x**2) - oidsAgg.push_back(oidsAgg[j]); - keysAgg.push_back(keysAgg[j]); + oidsAgg.push_back(oidsProj[j]); + keysAgg.push_back(keysProj[j]); scaleAgg.push_back(0); precisionAgg.push_back(-1); typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -1942,7 +1942,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAgg.push_back(CalpontSystemCatalog::LONGDOUBLE); precisionAgg.push_back(-1); widthAgg.push_back(sizeof(long double)); - scaleAgg.push_back(scaleProj[colProj]); + scaleAgg.push_back(0); colAgg++; // has distinct step, put the count column for avg next to the sum @@ -2264,7 +2264,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); precisionAggDist.push_back(-1); widthAggDist.push_back(sizeof(long double)); - scaleAggDist.push_back(scaleProj[colAgg]); + scaleAggDist.push_back(0); } break; @@ -2336,10 +2336,10 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( { oidsAggDist.push_back(oidsAgg[colAgg]); keysAggDist.push_back(retKey); - scaleAggDist.push_back(scaleAgg[colAgg] >> 8); - precisionAggDist.push_back(precisionAgg[colAgg]); - typeAggDist.push_back(typeAgg[colAgg]); - widthAggDist.push_back(widthAgg[colAgg]); + scaleAggDist.push_back(0); + typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); + precisionAggDist.push_back(-1); + widthAggDist.push_back(sizeof(long double)); } else { @@ -2631,8 +2631,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( functionVec2[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAgg[j]); + keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(0); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -2640,8 +2640,8 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( ++lastCol; // sum(x**2) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAgg[j]); + keysAggDist.push_back(keysAgg[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(-1); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -3188,7 +3188,7 @@ void TupleAggregateStep::prep2PhasesAggregate( oidsAggPm.push_back(oidsProj[colProj]); keysAggPm.push_back(aggKey); typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE); - scaleAggPm.push_back(scaleProj[colProj]); + scaleAggPm.push_back(0); precisionAggPm.push_back(-1); widthAggPm.push_back(sizeof(long double)); colAggPm++; @@ -3465,10 +3465,10 @@ void TupleAggregateStep::prep2PhasesAggregate( { oidsAggUm.push_back(oidsAggPm[colPm]); keysAggUm.push_back(retKey); - scaleAggUm.push_back(scaleAggPm[colPm] >> 8); - precisionAggUm.push_back(precisionAggPm[colPm]); - typeAggUm.push_back(typeAggPm[colPm]); - widthAggUm.push_back(widthAggPm[colPm]); + scaleAggUm.push_back(0); + typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); + precisionAggUm.push_back(-1); + widthAggUm.push_back(sizeof(long double)); } else { @@ -3683,8 +3683,8 @@ void TupleAggregateStep::prep2PhasesAggregate( functionVecUm[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAggUm.push_back(oidsAggUm[j]); - keysAggUm.push_back(keysAggUm[j]); + oidsAggUm.push_back(oidsAggPm[j]); + keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); precisionAggUm.push_back(-1); typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -3692,8 +3692,8 @@ void TupleAggregateStep::prep2PhasesAggregate( ++lastCol; // sum(x**2) - oidsAggUm.push_back(oidsAggUm[j]); - keysAggUm.push_back(keysAggUm[j]); + oidsAggUm.push_back(oidsAggPm[j]); + keysAggUm.push_back(keysAggPm[j]); scaleAggUm.push_back(0); precisionAggUm.push_back(-1); typeAggUm.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -4046,7 +4046,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggPm.push_back(CalpontSystemCatalog::LONGDOUBLE); precisionAggPm.push_back(-1); widthAggPm.push_back(sizeof(long double)); - scaleAggPm.push_back(scaleProj[colProj]); + scaleAggPm.push_back(0); colAggPm++; } @@ -4415,7 +4415,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); precisionAggDist.push_back(-1); widthAggDist.push_back(sizeof(long double)); - scaleAggDist.push_back(scaleAggUm[colUm]); + scaleAggDist.push_back(0); } // PM: put the count column for avg next to the sum // let fall through to add a count column for average function @@ -4477,10 +4477,10 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( { oidsAggDist.push_back(oidsAggUm[colUm]); keysAggDist.push_back(retKey); - scaleAggDist.push_back(scaleAggUm[colUm] >> 8); - precisionAggDist.push_back(precisionAggUm[colUm]); - typeAggDist.push_back(typeAggUm[colUm]); - widthAggDist.push_back(widthAggUm[colUm]); + scaleAggDist.push_back(0); + typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); + precisionAggDist.push_back(-1); + widthAggDist.push_back(sizeof(long double)); } else { @@ -4725,8 +4725,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( functionVecUm[i]->fAuxColumnIndex = lastCol; // sum(x) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAggPm[j]); + keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(-1); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -4734,8 +4734,8 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( ++lastCol; // sum(x**2) - oidsAggDist.push_back(oidsAggDist[j]); - keysAggDist.push_back(keysAggDist[j]); + oidsAggDist.push_back(oidsAggPm[j]); + keysAggDist.push_back(keysAggPm[j]); scaleAggDist.push_back(0); precisionAggDist.push_back(-1); typeAggDist.push_back(CalpontSystemCatalog::LONGDOUBLE); @@ -5038,7 +5038,8 @@ void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInf uint64_t eid = -1; if (((ac = dynamic_cast(it->get())) != NULL) && - (ac->aggColumnList().size() > 0)) + (ac->aggColumnList().size() > 0) && + (ac->windowfunctionColumnList().size() == 0)) { const vector& scols = ac->simpleColumnList(); simpleColumns.insert(simpleColumns.end(), scols.begin(), scols.end()); @@ -5047,7 +5048,8 @@ void TupleAggregateStep::prepExpressionOnAggregate(SP_ROWAGG_UM_t& aggUM, JobInf expressionVec.push_back(*it); } else if (((fc = dynamic_cast(it->get())) != NULL) && - (fc->aggColumnList().size() > 0)) + (fc->aggColumnList().size() > 0) && + (fc->windowfunctionColumnList().size() == 0)) { const vector& sCols = fc->simpleColumnList(); simpleColumns.insert(simpleColumns.end(), sCols.begin(), sCols.end()); diff --git a/dbcon/joblist/windowfunctionstep.cpp b/dbcon/joblist/windowfunctionstep.cpp index 610026e33..b145bfa12 100644 --- a/dbcon/joblist/windowfunctionstep.cpp +++ b/dbcon/joblist/windowfunctionstep.cpp @@ -320,6 +320,41 @@ const string WindowFunctionStep::toString() const return oss.str(); } +void WindowFunctionStep::AddSimplColumn(const vector& scs, + JobInfo& jobInfo) +{ + // append the simple columns if not already projected + set scProjected; + + for (RetColsVector::iterator i = jobInfo.projectionCols.begin(); + i != jobInfo.projectionCols.end(); + i++) + { + SimpleColumn* sc = dynamic_cast(i->get()); + + if (sc != NULL) + { + if (sc->schemaName().empty()) + sc->oid(joblist::tableOid(sc, jobInfo.csc) + 1 + sc->colPosition()); + + scProjected.insert(UniqId(sc)); + } + } + + for (vector::const_iterator i = scs.begin(); i != scs.end(); i++) + { + if (scProjected.find(UniqId(*i)) == scProjected.end()) + { + jobInfo.windowDels.push_back(SRCP((*i)->clone())); +// MCOL-3343 Enable this if we decide to allow Window Functions to run with +// aggregates with no group by. MariaDB allows this. Nobody else in the world does. +// There will be more work to get it to function if we try this. +// jobInfo.windowSet.insert(getTupleKey(jobInfo, *i, true)); + scProjected.insert(UniqId(*i)); + } + } +} + void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo) { // window functions in select clause, selected or in expression @@ -404,6 +439,23 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J if (jobInfo.windowCols.empty()) return; + // Add in the non-window side of arithmetic columns and functions + for (uint64_t i = 0; i < jobInfo.windowExps.size(); i++) + { + const ArithmeticColumn* ac = + dynamic_cast(jobInfo.windowExps[i].get()); + const FunctionColumn* fc = + dynamic_cast(jobInfo.windowExps[i].get()); + + if (ac != NULL && ac->windowfunctionColumnList().size() > 0) + { + AddSimplColumn(ac->simpleColumnList(), jobInfo); + } + else if (fc != NULL && fc->windowfunctionColumnList().size() > 0) + { + AddSimplColumn(fc->simpleColumnList(), jobInfo); + } + } // reconstruct the delivered column list with auxiliary columns set colSet; jobInfo.deliveredCols.resize(0); @@ -445,7 +497,13 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J key = getTupleKey(jobInfo, *j, true); if (colSet.find(key) == colSet.end()) + { jobInfo.deliveredCols.push_back(*j); +// MCOL-3343 Enable this if we decide to allow Window Functions to run with +// aggregates with no group by. MariaDB allows this. Nobody else in the world does. +// There will be more work to get it to function if we try this. +// jobInfo.windowSet.insert(getTupleKey(jobInfo, *j, true)); + } colSet.insert(key); } diff --git a/dbcon/joblist/windowfunctionstep.h b/dbcon/joblist/windowfunctionstep.h index b57d06324..eaa020fc3 100644 --- a/dbcon/joblist/windowfunctionstep.h +++ b/dbcon/joblist/windowfunctionstep.h @@ -148,6 +148,8 @@ private: void formatMiniStats(); void printCalTrace(); + static void AddSimplColumn(const std::vector& scs, JobInfo& jobInfo); + class Runner { public: diff --git a/dbcon/mysql/columnstore_info.sql b/dbcon/mysql/columnstore_info.sql index 7655b5f16..b2f5e50d1 100644 --- a/dbcon/mysql/columnstore_info.sql +++ b/dbcon/mysql/columnstore_info.sql @@ -101,7 +101,7 @@ END // create procedure columnstore_upgrade() `columnstore_upgrade`: BEGIN DECLARE done INTEGER DEFAULT 0; - DECLARE schema_table VARCHAR(100) DEFAULT ""; + DECLARE schema_table VARCHAR(100) CHARACTER SET utf8 DEFAULT ""; DECLARE table_list CURSOR FOR select concat('`', table_schema,'`.`',table_name,'`') from information_schema.tables where engine='columnstore'; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; OPEN table_list; @@ -115,6 +115,5 @@ create procedure columnstore_upgrade() DEALLOCATE PREPARE stmt; END LOOP; END // -delimiter ; DELIMITER ; diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index 156babf41..cb1f3853e 100644 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -1043,7 +1043,6 @@ int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_ dataLength = *(uint16_t*) buf; buf = buf + 2 ; } - escape.assign((char*)buf, dataLength); boost::replace_all(escape, "\\", "\\\\"); fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter); @@ -1061,22 +1060,16 @@ int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_ buf = buf + 2 ; } - if ( dataLength > ci.columnTypes[colpos].colWidth) - dataLength = ci.columnTypes[colpos].colWidth; - escape.assign((char*)buf, dataLength); boost::replace_all(escape, "\\", "\\\\"); fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter); } } - - //buf += ci.columnTypes[colpos].colWidth; if (ci.utf8) buf += (ci.columnTypes[colpos].colWidth * 3); else buf += ci.columnTypes[colpos].colWidth; - break; } diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 1d8a3f47e..55b389b50 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -2363,8 +2363,15 @@ SimpleColumn* buildSimpleColFromDerivedTable(gp_walk_info& gwi, Item_field* ifp) sc->colPosition(j); string tableAlias(csep->derivedTbAlias()); sc->tableAlias(lower(tableAlias)); - sc->viewName(lower(viewName)); sc->timeZone(gwi.thd->variables.time_zone->get_name()->ptr()); + if (!viewName.empty()) + { + sc->viewName(viewName); + } + else + { + sc->viewName(csep->derivedTbView()); + } sc->resultType(cols[j]->resultType()); sc->hasAggregate(cols[j]->hasAggregate()); diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index b961ca553..14aa1d61d 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -1340,16 +1340,13 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) else schemaName = string(item->db_name); + columnAssignmentPtr = new ColumnAssignment(item->name.str, "=", ""); if (item->field_type() == MYSQL_TYPE_TIMESTAMP || item->field_type() == MYSQL_TYPE_TIMESTAMP2) { timeStampColumnNames.insert(string(item->name.str)); } - columnAssignmentPtr = new ColumnAssignment(); - columnAssignmentPtr->fColumn = string(item->name.str); - columnAssignmentPtr->fOperator = "="; - columnAssignmentPtr->fFuncScale = 0; Item* value = value_it++; if (value->type() == Item::CONST_ITEM) @@ -1468,8 +1465,9 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) else if ( value->type() == Item::NULL_ITEM ) { // dmlStmt += "NULL"; - columnAssignmentPtr->fScalarExpression = "NULL"; + columnAssignmentPtr->fScalarExpression = ""; columnAssignmentPtr->fFromCol = false; + columnAssignmentPtr->fIsNull = true; } else if ( value->type() == Item::SUBSELECT_ITEM ) { @@ -1535,11 +1533,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) { if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end()) { - columnAssignmentPtr = new ColumnAssignment(); - columnAssignmentPtr->fColumn = string(onUpdateTimeStampColumns[i]); - columnAssignmentPtr->fOperator = "="; - columnAssignmentPtr->fFuncScale = 0; - columnAssignmentPtr->fFromCol = false; + columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", ""); struct timeval tv; char buf[64]; gettimeofday(&tv, 0); diff --git a/dbcon/mysql/ha_from_sub.cpp b/dbcon/mysql/ha_from_sub.cpp index 4d2553511..0093330aa 100644 --- a/dbcon/mysql/ha_from_sub.cpp +++ b/dbcon/mysql/ha_from_sub.cpp @@ -347,6 +347,7 @@ SCSEP FromSubQuery::transform() gwi.subQuery = this; gwi.viewName = fGwip.viewName; csep->derivedTbAlias(fAlias); // always lower case + csep->derivedTbView(fGwip.viewName.alias); if (getSelectPlan(gwi, *fFromSub, csep, fPushdownHand) != 0) { diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index a3e6fa713..09e5b9128 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -86,6 +86,8 @@ using namespace joblist; namespace fs = boost::filesystem; +threadpool::ThreadPool DMLServer::fDmlPackagepool(10, 0); + namespace { DistributedEngineComm* Dec; @@ -605,7 +607,7 @@ int main(int argc, char* argv[]) int temp; int serverThreads = 10; - int serverQueueSize = 50; + int serverQueueSize = 0; const string DMLProc("DMLProc"); temp = toInt(cf->getConfig(DMLProc, "ServerThreads")); @@ -613,10 +615,9 @@ int main(int argc, char* argv[]) if (temp > 0) serverThreads = temp; - temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); - - if (temp > 0) - serverQueueSize = temp; +// temp = toInt(cf->getConfig(DMLProc, "ServerQueueSize")); +// if (temp > 0) +// serverQueueSize = temp; //read and cleanup port before trying to use try @@ -656,6 +657,8 @@ int main(int argc, char* argv[]) { JobStep::jobstepThreadPool.setDebug(true); JobStep::jobstepThreadPool.invoke(threadpool::ThreadPoolMonitor(&JobStep::jobstepThreadPool)); + DMLServer::fDmlPackagepool.setDebug(true); + DMLServer::fDmlPackagepool.invoke(threadpool::ThreadPoolMonitor(&DMLServer::fDmlPackagepool)); } //set ACTIVE state diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 861198d45..9e1601a36 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -349,7 +349,7 @@ PackageHandler::~PackageHandler() // Rollback will most likely be next. // // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. -int PackageHandler::synchTableAccess() +int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage) { // MCOL-140 Wait for any other DML using this table. std::map::iterator it; @@ -393,10 +393,27 @@ int PackageHandler::synchTableAccess() // tableOidQueue here is the queue holding the waitng transactions for this fTableOid while (true) { + // Log something that we're waiting + LoggingID logid(21, fSessionID, fTxnid); + logging::Message::Args args1; + logging::Message msg(1); + ostringstream oss; + oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|"; + args1.add(oss.str()); + args1.add((uint64_t)fTableOid); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + tableOidCond.wait(lock); - + // In case of CTRL+C, the tableOidQueue could be invalidated + if ((tableOidMap.find(fTableOid))->second != tableOidQueue) + { + break; + } if (tableOidQueue.front() == fTxnid) { + // We're up next. Let's go do stuff. break; } @@ -439,7 +456,6 @@ int PackageHandler::releaseTableAccess() if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) { - // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess return 2; // For now, return codes are not used } @@ -461,7 +477,8 @@ int PackageHandler::releaseTableAccess() } else { - tableOidQueue.pop(); // Get off the waiting list. + if (!tableOidQueue.empty()) + tableOidQueue.pop(); // Get off the waiting list. if (tableOidQueue.empty()) { @@ -477,7 +494,7 @@ int PackageHandler::releaseTableAccess() int PackageHandler::forceReleaseTableAccess() { - // By removing the tcnid from the queue, the logic after the wait in + // By removing the txnid from the queue, the logic after the wait in // synchTableAccess() will release the thread and clean up if needed. std::map::iterator it; boost::lock_guard lock(tableOidMutex); @@ -490,6 +507,11 @@ int PackageHandler::forceReleaseTableAccess() PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; tableOidQueue.erase(fTxnid); + if (tableOidQueue.empty()) + { + // remove the queue from the map. + tableOidMap.erase(fTableOid); + } // release the condition tableOidCond.notify_all(); return 1; @@ -513,7 +535,8 @@ void PackageHandler::run() std::string stmt; unsigned DMLLoggingId = 21; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); - + SynchTable synchTable; + try { switch ( fPackageType ) @@ -542,8 +565,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid } #endif @@ -921,8 +943,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif @@ -981,8 +1002,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - - synchTableAccess(); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif @@ -1049,15 +1069,8 @@ void PackageHandler::run() break; } -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif //Log errors if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) @@ -1080,15 +1093,8 @@ void PackageHandler::run() } catch (std::exception& e) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); @@ -1105,15 +1111,8 @@ void PackageHandler::run() } catch (...) { -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're done. release the next waiting txn for this fTableOid - releaseTableAccess(); - } -#endif logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; @@ -1157,9 +1156,17 @@ void PackageHandler::run() void PackageHandler::rollbackPending() { + if (fProcessor.get() == NULL) + { + // This happens when batch insert + return; + } + + fProcessor->setRollbackPending(true); + // Force a release of the processing from MCOL-140 #ifdef MCOL_140 - if (fConcurrentSupport) + if (fConcurrentSupport) { // MCOL-140 We're not necessarily the next in line. // This forces this thread to be released anyway. @@ -1168,14 +1175,6 @@ void PackageHandler::rollbackPending() #endif - if (fProcessor.get() == NULL) - { - // This happens when batch insert - return; - } - - fProcessor->setRollbackPending(true); - ostringstream oss; oss << "PackageHandler::rollbackPending: Processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 60ba8a14b..6a18493b1 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -151,15 +151,16 @@ private: DMLServer(const DMLServer& rhs); DMLServer& operator=(const DMLServer& rhs); - /** @brief the thread pool for processing dml packages - */ - threadpool::ThreadPool fDmlPackagepool; - int fPackageMaxThreads; /** @brief max number of threads to process dml packages */ int fPackageWorkQueueSize; /** @brief max number of packages waiting in the work queue */ boost::scoped_ptr fMqServer; BRM::DBRM* fDbrm; + +public: + /** @brief the thread pool for processing dml packages + */ + static threadpool::ThreadPool fDmlPackagepool; }; /** @brief Thread to process a single dml package. @@ -207,12 +208,12 @@ private: // Used to serialize operations because the VSS can't handle inserts // or updates on the same block. // When an Insert, Update or Delete command arrives, we look here - // for the table oid. If found, wait until it is no onger here. + // for the table oid. If found, wait until it is no longer here. // If this transactionID (SCN) is < the transactionID in the table, don't delay // and hope for the best, as we're already out of order. // When the VSS is engineered to handle transactions out of order, all MCOL-140 // code is to be removed. - int synchTableAccess(); + int synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage); int releaseTableAccess(); int forceReleaseTableAccess(); typedef iterable_queue tableAccessQueue_t; @@ -221,6 +222,35 @@ private: static boost::mutex tableOidMutex; public: static int clearTableAccess(); + + // MCOL-3296 Add a class to call synchTableAccess on creation and + // releaseTableAccess on destuction for exception safeness. + class SynchTable + { + public: + SynchTable() : fphp(NULL) {}; + SynchTable(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) + { + setPackage(php, dmlPackage); + } + ~SynchTable() + { + if (fphp) + fphp->releaseTableAccess(); + } + bool setPackage(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) + { + if (fphp) + fphp->releaseTableAccess(); + fphp = php; + if (fphp) + fphp->synchTableAccess(dmlPackage); + return true; + } + private: + PackageHandler* fphp; + }; + }; /** @brief processes dml packages as they arrive diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp index c0507aadd..67907ceb9 100644 --- a/oamapps/mcsadmin/mcsadmin.cpp +++ b/oamapps/mcsadmin/mcsadmin.cpp @@ -899,9 +899,9 @@ int processCommand(string* arguments) SendToWES(oam, bs); } #if _MSC_VER - if (_strnicmp(arguments[1].c_str(), "stop", 4) == 0)) + else if (_strnicmp(arguments[1].c_str(), "stop", 4) == 0)) #else - if (strncasecmp(arguments[1].c_str(), "stop", 4) == 0) + else if (strncasecmp(arguments[1].c_str(), "stop", 4) == 0) #endif { ByteStream bs; @@ -913,9 +913,9 @@ int processCommand(string* arguments) SendToWES(oam, bs); } #if _MSC_VER - if (_strnicmp(arguments[1].c_str(), "status", 6) == 0)) + else if (_strnicmp(arguments[1].c_str(), "status", 6) == 0)) #else - if (strncasecmp(arguments[1].c_str(), "status", 6) == 0) + else if (strncasecmp(arguments[1].c_str(), "status", 6) == 0) #endif { ByteStream bs; diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp index 90649a2bb..4a13d783c 100644 --- a/oamapps/postConfigure/postConfigure.cpp +++ b/oamapps/postConfigure/postConfigure.cpp @@ -5338,7 +5338,7 @@ bool storageSetup(bool amazonInstall) if ( (glusterInstalled == "y" && singleServerInstall != "1") && hadoopInstalled == "y" ) { - cout << "There are 5 options when configuring the storage: internal, external, DataRedundancy, or hdfs" << endl << endl; + cout << "There are 4 options when configuring the storage: internal, external, DataRedundancy, or hdfs" << endl << endl; prompt = "Select the type of Data Storage [1=internal, 2=external, 3=DataRedundancy, 4=hdfs] (" + storageType + ") > "; } diff --git a/oamapps/serverMonitor/diskMonitor.cpp b/oamapps/serverMonitor/diskMonitor.cpp index 7b1e13d5d..90c99e56d 100644 --- a/oamapps/serverMonitor/diskMonitor.cpp +++ b/oamapps/serverMonitor/diskMonitor.cpp @@ -267,7 +267,7 @@ void diskMonitor() blksize = buf.f_bsize; blocks = buf.f_blocks; - freeblks = buf.f_bfree; + freeblks = buf.f_bavail; totalBlocks = blocks * blksize; free = freeblks * blksize; @@ -396,7 +396,7 @@ void diskMonitor() blksize = buf.f_bsize; blocks = buf.f_blocks; - freeblks = buf.f_bfree; + freeblks = buf.f_bavail; totalBlocks = blocks * blksize; free = freeblks * blksize; diff --git a/primitives/primproc/dictstep.cpp b/primitives/primproc/dictstep.cpp index 47bfeac25..abd99ada3 100644 --- a/primitives/primproc/dictstep.cpp +++ b/primitives/primproc/dictstep.cpp @@ -30,7 +30,6 @@ #include #include -#include #include "bpp.h" #include "primitiveserver.h" @@ -94,7 +93,6 @@ void DictStep::createCommand(ByteStream& bs) for (uint32_t i = 0; i < filterCount; i++) { bs >> strTmp; - boost::trim_right_if(strTmp, boost::is_any_of(" ")); //cout << " " << strTmp << endl; eqFilter->insert(strTmp); } diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index b79aabf04..f64e38a4a 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -28,7 +28,6 @@ #include #include #include -#include //#define NDEBUG #include #include @@ -1778,7 +1777,6 @@ private: for (i = 0; i < count; i++) { *bs >> str; - boost::trim_right_if(str, boost::is_any_of(" ")); filter->insert(str); } diff --git a/tools/configMgt/autoConfigure.cpp b/tools/configMgt/autoConfigure.cpp index f6d7b51e2..7e910904b 100644 --- a/tools/configMgt/autoConfigure.cpp +++ b/tools/configMgt/autoConfigure.cpp @@ -360,7 +360,19 @@ int main(int argc, char* argv[]) exit(-1); } - //setup System Language + // WaitPeriod + try + { + string waitPeriod = sysConfigOld->getConfig(SystemSection, "WaitPeriod"); + if (waitPeriod.length() > 0) + { + sysConfigNew->setConfig(SystemSection, "WaitPeriod", waitPeriod); + } + } + catch (...) + { } + + //setup System Language string systemLang = "C"; try diff --git a/utils/funcexp/func_timediff.cpp b/utils/funcexp/func_timediff.cpp index 1254f2d1b..0e16e3a4f 100644 --- a/utils/funcexp/func_timediff.cpp +++ b/utils/funcexp/func_timediff.cpp @@ -118,7 +118,10 @@ string Func_timediff::getStrVal(rowgroup::Row& row, case execplan::CalpontSystemCatalog::TIME: case execplan::CalpontSystemCatalog::DATETIME: - if (type1 != type2) + // Diff between time and datetime returns NULL in MariaDB + if ((type2 == execplan::CalpontSystemCatalog::TIME || + type2 == execplan::CalpontSystemCatalog::DATETIME) && + type1 != type2) { isNull = true; break; diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index d496cbcb0..0990384cb 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -1771,7 +1771,8 @@ inline void copyRow(const Row& in, Row* out, uint32_t colCount) { if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB || - in.getColTypes()[i] == execplan::CalpontSystemCatalog::TEXT)) + in.getColTypes()[i] == execplan::CalpontSystemCatalog::TEXT || + in.getColTypes()[i] == execplan::CalpontSystemCatalog::CLOB)) out->setVarBinaryField(in.getVarBinaryStringField(i), i); else if (UNLIKELY(in.isLongString(i))) //out->setStringField(in.getStringField(i), i); diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 589fb6dcc..e62de7769 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -409,6 +409,24 @@ void ThreadPool::beginThread() throw() --fIssued; --waitingFunctorsSize; fWaitingFunctors.erase(todo); + if (fDebug) + { + ostringstream oss; + oss << "Ending thread " << " on " << fName + << " max " << fMaxThreads + << " queue " << fQueueSize + << " threads " << fThreadCount + << " running " << fIssued + << " waiting " << (waitingFunctorsSize - fIssued) + << " total " << waitingFunctorsSize; + logging::Message::Args args; + logging::Message message(0); + args.add(oss.str()); + message.format( args ); + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + ml.logWarningMessage( message ); + } } timeout = boost::get_system_time() + boost::posix_time::minutes(10); @@ -536,6 +554,8 @@ void ThreadPoolMonitor::operator()() << setw(4) << tv.tv_usec / 100 << " Name " << fPool->fName << " Active " << fPool->waitingFunctorsSize + << " running " << fPool->fIssued + << " waiting " << (fPool->waitingFunctorsSize - fPool->fIssued) << " ThdCnt " << fPool->fThreadCount << " Max " << fPool->fMaxThreads << " Q " << fPool->fQueueSize diff --git a/writeengine/bulk/we_bulkloadbuffer.cpp b/writeengine/bulk/we_bulkloadbuffer.cpp index 86124a783..fe0eaa70f 100644 --- a/writeengine/bulk/we_bulkloadbuffer.cpp +++ b/writeengine/bulk/we_bulkloadbuffer.cpp @@ -570,14 +570,16 @@ void BulkLoadBuffer::convert(char* field, int fieldLength, } // Swap byte order before comparing character string - int64_t binChar = static_cast( uint64ToStr( - *(reinterpret_cast(charTmpBuf)) ) ); + // Compare must be unsigned + uint64_t compChar = uint64ToStr( *(reinterpret_cast(charTmpBuf)) ); + int64_t binChar = static_cast( compChar ); // Update min/max range - if (binChar < bufStats.minBufferVal) + uint64_t minVal = static_cast( bufStats.minBufferVal ); + uint64_t maxVal = static_cast( bufStats.maxBufferVal ); + if (compChar < minVal) bufStats.minBufferVal = binChar; - - if (binChar > bufStats.maxBufferVal) + if (compChar > maxVal) bufStats.maxBufferVal = binChar; pVal = charTmpBuf; diff --git a/writeengine/dictionary/we_dctnry.cpp b/writeengine/dictionary/we_dctnry.cpp index 460a48191..40d48f8f2 100644 --- a/writeengine/dictionary/we_dctnry.cpp +++ b/writeengine/dictionary/we_dctnry.cpp @@ -423,8 +423,7 @@ int Dctnry::closeDctnry(bool realClose) return rc; //cout <<"Init called! m_dctnryOID =" << m_dctnryOID << endl; - if (realClose) - freeStringCache( ); + freeStringCache( ); return NO_ERROR; }