/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: crossenginestep.cpp 9709 2013-07-20 06:08:46Z xlou $ #define PREFER_MY_CONFIG_H #include "crossenginestep.h" #include //#define NDEBUG #include #include #include using namespace std; #include #include using namespace boost; #include "messagequeue.h" using namespace messageqcpp; #include "loggingid.h" #include "errorcodes.h" #include "idberrorinfo.h" #include "errorids.h" using namespace logging; #include "calpontsystemcatalog.h" #include "constantcolumn.h" using namespace execplan; #include "rowgroup.h" using namespace rowgroup; #include "querytele.h" using namespace querytele; #include "funcexp.h" #include "jobstep.h" #include "jlf_common.h" #include "libmysql_client.h" namespace joblist { CrossEngineStep::CrossEngineStep(const std::string& schema, const std::string& table, const std::string& alias, const JobInfo& jobInfo) : BatchPrimitive(jobInfo) , fRowsRetrieved(0) , fRowsReturned(0) , fRowsPerGroup(256) , fOutputDL(NULL) , fOutputIterator(0) , fRunner(0) , fEndOfResult(false) , fSchema(schema) , fTable(table) , fAlias(alias) , fColumnCount(0) , fFeInstance(funcexp::FuncExp::instance()) { fExtendedInfo = "CES: "; getMysqldInfo(jobInfo); fQtc.stepParms().stepType = StepTeleStats::T_CES; mysql = new utils::LibMySQL(); } CrossEngineStep::~CrossEngineStep() { delete mysql; } void CrossEngineStep::setOutputRowGroup(const rowgroup::RowGroup& rg) { fRowGroupOut = fRowGroupDelivered = fRowGroupAdded = rg; } void CrossEngineStep::deliverStringTableRowGroup(bool b) { // results are either using setField, or mapping to delivered row group fRowGroupDelivered.setUseStringTable(b); } bool CrossEngineStep::deliverStringTableRowGroup() const { return fRowGroupDelivered.usesStringTable(); } void CrossEngineStep::addFcnJoinExp(const std::vector& fe) { fFeFcnJoin = fe; } void CrossEngineStep::addFcnExpGroup1(const boost::shared_ptr& fe) { fFeFilters.push_back(fe); } void CrossEngineStep::setFE1Input(const rowgroup::RowGroup& rg) { fRowGroupFe1 = rg; } void CrossEngineStep::setFcnExpGroup3(const std::vector& fe) { fFeSelects = fe; } void CrossEngineStep::setFE23Output(const rowgroup::RowGroup& rg) { fRowGroupFe3 = fRowGroupDelivered = fRowGroupAdded = rg; } void CrossEngineStep::makeMappings() { fFe1Column.reset(new int[fColumnCount]); for (uint64_t i = 0; i < fColumnCount; ++i) fFe1Column[i] = -1; if (fFeFilters.size() > 0 || fFeFcnJoin.size() > 0) { const std::vector& colInFe1 = fRowGroupFe1.getKeys(); for (uint64_t i = 0; i < colInFe1.size(); i++) { map::iterator it = fColumnMap.find(colInFe1[i]); if (it != fColumnMap.end()) fFe1Column[it->second] = i; } fFeMapping1 = makeMapping(fRowGroupFe1, fRowGroupOut); } if (!fFeSelects.empty()) fFeMapping3 = makeMapping(fRowGroupOut, fRowGroupFe3); } void CrossEngineStep::setField(int i, const char* value, unsigned long length, MYSQL_FIELD* field, Row& row) { CalpontSystemCatalog::ColDataType colType = row.getColType(i); if ((colType == CalpontSystemCatalog::CHAR || colType == CalpontSystemCatalog::VARCHAR) && row.getColumnWidth(i) > 8) { if (value != NULL) row.setStringField((const uint8_t*)value, length, i); else row.setStringField(nullptr, 0, i); } else if ((colType == CalpontSystemCatalog::BLOB) || (colType == CalpontSystemCatalog::TEXT) || (colType == CalpontSystemCatalog::VARBINARY)) { if (value != NULL) row.setVarBinaryField((const uint8_t*)value, length, i); else row.setVarBinaryField(NULL, 0, i); } else { CalpontSystemCatalog::ColType ct; ct.colDataType = colType; ct.colWidth = row.getColumnWidth(i); if (colType == CalpontSystemCatalog::DECIMAL || colType == CalpontSystemCatalog::UDECIMAL) { ct.scale = field->decimals; ct.precision = field->length; if (ct.colWidth == datatypes::MAXDECIMALWIDTH) { row.setInt128Field(convertValueNum(value, ct), i); return; } } else { ct.scale = row.getScale(i); ct.precision = row.getPrecision(i); } int64_t v = convertValueNum(value, ct); row.setIntField(v, i); } } inline void CrossEngineStep::addRow(RGData& data) { fRowDelivered.setRid(fRowsReturned % fRowsPerGroup); fRowDelivered.nextRow(); fRowGroupAdded.incRowCount(); if (++fRowsReturned % fRowsPerGroup == 0) { fOutputDL->insert(data); data.reinit(fRowGroupAdded, fRowsPerGroup); fRowGroupAdded.setData(&data); fRowGroupAdded.resetRowGroup(fRowsReturned); fRowGroupAdded.getRow(0, &fRowDelivered); } } // simplified version of convertValueNum() in jlf_execplantojoblist.cpp. template T CrossEngineStep::convertValueNum(const char* str, const CalpontSystemCatalog::ColType& ct) { T rv = 0; bool pushWarning = false; bool nullFlag = (str == NULL); boost::any anyVal; anyVal = ct.convertColumnData((nullFlag ? "" : str), pushWarning, fTimeZone, nullFlag, true, false); // Out of range values are treated as NULL as discussed during design review. if (pushWarning) { anyVal = ct.getNullValueForType(); } switch (ct.colDataType) { case CalpontSystemCatalog::BIT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::TINYINT: #if BOOST_VERSION >= 105200 rv = boost::any_cast(anyVal); #else rv = boost::any_cast(anyVal); #endif break; case CalpontSystemCatalog::UTINYINT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::SMALLINT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::USMALLINT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::INT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::BIGINT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::UBIGINT: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: { float f = boost::any_cast(anyVal); // N.B. There is a bug in boost::any or in gcc where, if you store a nan, // you will get back a nan, but not necessarily the same bits that you put in. // This only seems to be for float (double seems to work). if (isnan(f)) { uint32_t ti = joblist::FLOATNULL; float* tfp = (float*)&ti; f = *tfp; } float* fp = &f; int32_t* ip = reinterpret_cast(fp); rv = *ip; } break; case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: { double d = boost::any_cast(anyVal); double* dp = &d; int64_t* ip = reinterpret_cast(dp); rv = *ip; } break; case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::TEXT: case CalpontSystemCatalog::CLOB: { if (nullFlag) { rv = joblist::CHAR8NULL; // SZ: I hate that. } else { std::string i = boost::any_cast(anyVal); // bug 1932, pad nulls up to the size of v i.resize(sizeof(rv), 0); rv = *((uint64_t*)i.data()); } } break; case CalpontSystemCatalog::DATE: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::DATETIME: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::TIMESTAMP: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::TIME: rv = boost::any_cast(anyVal); break; case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: if (LIKELY(ct.colWidth == datatypes::MAXDECIMALWIDTH)) rv = boost::any_cast(anyVal); else if (ct.colWidth == execplan::CalpontSystemCatalog::EIGHT_BYTE) rv = boost::any_cast(anyVal); else if (ct.colWidth == execplan::CalpontSystemCatalog::FOUR_BYTE) rv = boost::any_cast(anyVal); else if (ct.colWidth == execplan::CalpontSystemCatalog::TWO_BYTE) rv = boost::any_cast(anyVal); else if (ct.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE) rv = boost::any_cast(anyVal); break; default: break; } return rv; } void CrossEngineStep::getMysqldInfo(const JobInfo& jobInfo) { if (jobInfo.rm->getMysqldInfo(fHost, fUser, fPasswd, fPort) == false) throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG); } void CrossEngineStep::run() { // idbassert(!fDelivery); if (fOutputJobStepAssociation.outSize() == 0) throw logic_error("No output data list for cross engine step."); fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL(); if (fOutputDL == NULL) throw logic_error("Output is not a RowGroup data list."); if (fDelivery == true) { fOutputIterator = fOutputDL->getIterator(); } fRunner = jobstepThreadPool.invoke(Runner(this)); } void CrossEngineStep::join() { if (fRunner) jobstepThreadPool.join(fRunner); } void CrossEngineStep::execute() { int ret = 0; StepTeleStats sts; sts.query_uuid = fQueryUuid; sts.step_uuid = fStepUuid; try { sts.msg_type = StepTeleStats::ST_START; sts.total_units_of_work = 1; postStepStartTele(sts); ret = mysql->init(fHost.c_str(), fPort, fUser.c_str(), fPasswd.c_str(), fSchema.c_str()); if (ret != 0) mysql->handleMySqlError(mysql->getError().c_str(), ret); std::string query(makeQuery()); fLogger->logMessage(logging::LOG_TYPE_INFO, "QUERY to foreign engine: " + query); if (traceOn()) cout << "QUERY: " << query << endl; ret = mysql->run(query.c_str()); if (ret != 0) mysql->handleMySqlError(mysql->getError().c_str(), ret); int num_fields = mysql->getFieldCount(); char** rowIn; // input RGData rgDataDelivered; fRowGroupAdded.initRow(&fRowDelivered); // use getDataSize() i/o getMaxDataSize() to make sure there are 8192 rows. rgDataDelivered.reinit(fRowGroupAdded, fRowsPerGroup); fRowGroupAdded.setData(&rgDataDelivered); fRowGroupAdded.resetRowGroup(0); fRowGroupAdded.getRow(0, &fRowDelivered); if (traceOn()) dlTimes.setFirstReadTime(); // Any functions to evaluate makeMappings(); bool doFE1 = ((fFeFcnJoin.size() > 0) || (fFeFilters.size() > 0)); bool doFE3 = (fFeSelects.size() > 0); if (!doFE1 && !doFE3) { while ((rowIn = mysql->nextRow()) && !cancelled()) { for (int i = 0; i < num_fields; i++) setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), fRowDelivered); addRow(rgDataDelivered); } } else if (doFE1 && !doFE3) // FE in WHERE clause only { std::shared_ptr rgDataFe1; // functions in where clause Row rowFe1; // row for fe evaluation fRowGroupFe1.initRow(&rowFe1, true); rgDataFe1.reset(new uint8_t[rowFe1.getSize()]); rowFe1.setData(rowgroup::Row::Pointer(rgDataFe1.get())); while ((rowIn = mysql->nextRow()) && !cancelled()) { // Parse the columns used in FE1 first, the other column may not need be parsed. for (int i = 0; i < num_fields; i++) { if (fFe1Column[i] != -1) setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe1); } if (fFeFilters.size() > 0) { bool feBreak = false; for (std::vector >::iterator it = fFeFilters.begin(); it != fFeFilters.end(); it++) { if (fFeInstance->evaluate(rowFe1, (*it).get()) == false) { feBreak = true; break; } } if (feBreak) continue; } // evaluate the FE join column fFeInstance->evaluate(rowFe1, fFeFcnJoin); // Pass throug the parsed columns, and parse the remaining columns. applyMapping(fFeMapping1, rowFe1, &fRowDelivered); for (int i = 0; i < num_fields; i++) { if (fFe1Column[i] == -1) setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), fRowDelivered); } addRow(rgDataDelivered); } } else if (!doFE1 && doFE3) // FE in SELECT clause only { std::shared_ptr rgDataFe3; // functions in select clause Row rowFe3; // row for fe evaluation fRowGroupOut.initRow(&rowFe3, true); rgDataFe3.reset(new uint8_t[rowFe3.getSize()]); rowFe3.setData(rowgroup::Row::Pointer(rgDataFe3.get())); while ((rowIn = mysql->nextRow()) && !cancelled()) { for (int i = 0; i < num_fields; i++) setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe3); fFeInstance->evaluate(rowFe3, fFeSelects); applyMapping(fFeMapping3, rowFe3, &fRowDelivered); addRow(rgDataDelivered); } } else // FE in SELECT clause, FE join and WHERE clause { std::shared_ptr rgDataFe1; // functions in where clause Row rowFe1; // row for fe1 evaluation fRowGroupFe1.initRow(&rowFe1, true); rgDataFe1.reset(new uint8_t[rowFe1.getSize()]); rowFe1.setData(rowgroup::Row::Pointer(rgDataFe1.get())); std::shared_ptr rgDataFe3; // functions in select clause Row rowFe3; // row for fe3 evaluation fRowGroupOut.initRow(&rowFe3, true); rgDataFe3.reset(new uint8_t[rowFe3.getSize()]); rowFe3.setData(rowgroup::Row::Pointer(rgDataFe3.get())); while ((rowIn = mysql->nextRow()) && !cancelled()) { // Parse the columns used in FE1 first, the other column may not need be parsed. for (int i = 0; i < num_fields; i++) { if (fFe1Column[i] != -1) setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe1); } if (fFeFilters.size() > 0) { bool feBreak = false; for (std::vector >::iterator it = fFeFilters.begin(); it != fFeFilters.end(); it++) { if (fFeInstance->evaluate(rowFe1, (*it).get()) == false) { feBreak = true; break; } } if (feBreak) continue; } // evaluate the FE join column fFeInstance->evaluate(rowFe1, fFeFcnJoin); // Pass throug the parsed columns, and parse the remaining columns. applyMapping(fFeMapping1, rowFe1, &rowFe3); for (int i = 0; i < num_fields; i++) { if (fFe1Column[i] == -1) setField(i, rowIn[i], mysql->getFieldLength(i), mysql->getField(i), rowFe3); } fFeInstance->evaluate(rowFe3, fFeSelects); applyMapping(fFeMapping3, rowFe3, &fRowDelivered); addRow(rgDataDelivered); } } // INSERT_ADAPTER(fOutputDL, rgDataDelivered); fOutputDL->insert(rgDataDelivered); fRowsRetrieved = mysql->getRowCount(); } catch (...) { handleException(std::current_exception(), logging::ERR_CROSS_ENGINE_CONNECT, logging::ERR_ALWAYS_CRITICAL, "CrossEngineStep::execute()"); } sts.msg_type = StepTeleStats::ST_SUMMARY; sts.total_units_of_work = sts.units_of_work_completed = 1; sts.rows = fRowsReturned; postStepSummaryTele(sts); fEndOfResult = true; fOutputDL->endOfInput(); // Bug 3136, let mini stats to be formatted if traceOn. if (traceOn()) { dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); printCalTrace(); } } void CrossEngineStep::setBPP(JobStep* jobStep) { pColStep* pcs = dynamic_cast(jobStep); pColScanStep* pcss = NULL; pDictionaryStep* pds = NULL; pDictionaryScan* pdss = NULL; FilterStep* fs = NULL; std::string bop = " AND "; if (pcs != 0) { if (dynamic_cast(pcs) != NULL) throw logic_error("No Psedo Column for foreign engine."); if (pcs->BOP() == BOP_OR) bop = " OR "; addFilterStr(pcs->getFilters(), bop); } else if ((pcss = dynamic_cast(jobStep)) != NULL) { if (pcss->BOP() == BOP_OR) bop = " OR "; addFilterStr(pcss->getFilters(), bop); } else if ((pds = dynamic_cast(jobStep)) != NULL) { if (pds->BOP() == BOP_OR) bop = " OR "; addFilterStr(pds->getFilters(), bop); } else if ((pdss = dynamic_cast(jobStep)) != NULL) { if (pds->BOP() == BOP_OR) bop = " OR "; addFilterStr(pdss->getFilters(), bop); } else if ((fs = dynamic_cast(jobStep)) != NULL) { addFilterStr(fs->getFilters(), bop); } } void CrossEngineStep::addFilterStr(const std::vector& f, const std::string& bop) { if (f.size() == 0) return; std::string filterStr; for (uint64_t i = 0; i < f.size(); i++) { if (f[i]->data().empty()) continue; if (!filterStr.empty()) filterStr += bop; filterStr += f[i]->data(); } if (!filterStr.empty()) { if (!fWhereClause.empty()) fWhereClause += " AND (" + filterStr + ")"; else fWhereClause += " WHERE (" + filterStr + ")"; } } void CrossEngineStep::setProjectBPP(JobStep* jobStep1, JobStep*) { fColumnMap[jobStep1->tupleId()] = fColumnCount++; if (!fSelectClause.empty()) fSelectClause += ", "; else fSelectClause += "SELECT "; fSelectClause += "`" + jobStep1->name() + "`"; } std::string CrossEngineStep::makeQuery() { ostringstream oss; oss << fSelectClause << " FROM `" << fTable << "`"; if (fTable.compare(fAlias) != 0) oss << " `" << fAlias << "`"; if (!fWhereClause.empty()) oss << fWhereClause; // the std::string must consist of a single SQL statement without a terminating semicolon ; or \g. // oss << ";"; return oss.str(); } const RowGroup& CrossEngineStep::getOutputRowGroup() const { return fRowGroupOut; } const RowGroup& CrossEngineStep::getDeliveredRowGroup() const { return fRowGroupDelivered; } uint32_t CrossEngineStep::nextBand(messageqcpp::ByteStream& bs) { RGData rgDataOut; bool more = false; uint32_t rowCount = 0; try { bs.restart(); more = fOutputDL->next(fOutputIterator, &rgDataOut); if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0) dlTimes.setFirstReadTime(); if (more && !cancelled()) { fRowGroupDelivered.setData(&rgDataOut); fRowGroupDelivered.serializeRGData(bs); rowCount = fRowGroupDelivered.getRowCount(); } else { while (more) more = fOutputDL->next(fOutputIterator, &rgDataOut); fEndOfResult = true; } } catch (...) { handleException(std::current_exception(), logging::ERR_IN_DELIVERY, logging::ERR_ALWAYS_CRITICAL, "CrossEngineStep::nextBand()"); while (more) more = fOutputDL->next(fOutputIterator, &rgDataOut); fEndOfResult = true; } if (fEndOfResult) { // send an empty / error band rgDataOut.reinit(fRowGroupDelivered, 0); fRowGroupDelivered.setData(&rgDataOut); fRowGroupDelivered.resetRowGroup(0); fRowGroupDelivered.setStatus(status()); fRowGroupDelivered.serializeRGData(bs); if (traceOn()) { dlTimes.setLastReadTime(); dlTimes.setEndOfInputTime(); } if (traceOn()) printCalTrace(); } return rowCount; } const std::string CrossEngineStep::toString() const { ostringstream oss; oss << "CrossEngineStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId; oss << " in:"; for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++) oss << fInputJobStepAssociation.outAt(i); oss << " out:"; for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++) oss << fOutputJobStepAssociation.outAt(i); oss << endl; return oss.str(); } void CrossEngineStep::printCalTrace() { time_t t = time(0); char timeString[50]; ctime_r(&t, timeString); timeString[strlen(timeString) - 1] = '\0'; ostringstream logStr; logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString << "; rows retrieved-" << fRowsRetrieved << "; total rows returned-" << fRowsReturned << endl << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString() << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl << "\tJob completion status " << status() << endl; logEnd(logStr.str().c_str()); fExtendedInfo += logStr.str(); formatMiniStats(); } void CrossEngineStep::formatMiniStats() { ostringstream oss; oss << "CES " << "UM " << "- " << "- " << "- " << "- " << "- " << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " " << fRowsReturned << " "; fMiniInfo += oss.str(); } } // namespace joblist