diff --git a/.gitignore b/.gitignore index 3ed72171a..1661c661a 100644 --- a/.gitignore +++ b/.gitignore @@ -177,3 +177,7 @@ build/Testing/ tests/*\[1\]_tests.cmake tests/*\[1\]_include.cmake .boost +*.vtg +*.vtg-back +'*.vtg-Stashed changes' + diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index e4bf08f04..53ffe0cb2 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -1989,6 +1989,7 @@ bool sendExecutionPlanToExeMgr(sm::cpsm_conhdl_t* hndl, ByteStream::quadbyte qb, } // namespace +// Called only for ANALYZE TABLE int ha_mcs_impl_analyze(THD* thd, TABLE* table) { uint32_t sessionID = execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id); @@ -2685,6 +2686,26 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) ci = reinterpret_cast(get_fe_conn_info_ptr()); } + if (thd->lex->analyze_stmt && ci->cal_conn_hndl && ci->cal_conn_hndl->exeMgr) + { + // The ANALYZE statement leaves ExeMgr hanging. This clears it up. + ci->cal_conn_hndl->exeMgr->read(); // Ignore the returned buffer + ByteStream msg; + ByteStream::quadbyte qb = 1; // Tell PrimProc front session to eat all the rows + msg << qb; + ci->cal_conn_hndl->exeMgr->write(msg); + // This is the command to start sending return values. because we previously sent the swallow + // rows command, there won't be anything useful coming back, but it needs this to flush internal queues. + qb = 5; // Read the result data. + msg.reset(); + msg << qb; + ci->cal_conn_hndl->exeMgr->write(msg); + qb = 0; // End the query + msg.reset(); + msg << qb; + ci->cal_conn_hndl->exeMgr->write(msg); + } + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { force_close_fep_conn(thd, ci); @@ -4813,7 +4834,7 @@ int ha_mcs_impl_group_by_end(TABLE* table) * Execute the query and saves derived table query. * There is an extra handler argument so I ended up with a * new init function. The code is a copy of - * ha_mcs_impl_rnd_init() mostly. + * impl_rnd_init() mostly. * PARAMETERS: * mcs_handler_info* pnt to an envelope struct * TABLE* table - dest table to put the results into diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index 48bb9577a..5b43746f0 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -420,6 +420,11 @@ void SQLFrontSessionThread::operator()() analyzeTableHandleStats(bs); continue; } + else if (qb == 0) + { + // 0 => Nothing left to do. Sent by rnd_end() just to be sure. + continue; + } else { if (gDebug) @@ -765,62 +770,64 @@ void SQLFrontSessionThread::operator()() bs << errInfo->errorMsg(jl->status()); } - try // @bug2244: try/catch around fIos.write() calls projecting rows + if (!swallowRows) { - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + try // @bug2244: try/catch around fIos.write() calls projecting rows { - // Skip the write to the front end until the last empty band. Used to time queries - // through without any front end waiting. - if (tableOID < 3000 || rowCount == 0) + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + // Skip the write to the front end until the last empty band. Used to time queries + // through without any front end waiting. + if (tableOID < 3000 || rowCount == 0) + fIos.write(bs); + } + else + { fIos.write(bs); + } } - else + catch (std::exception& ex) { - fIos.write(bs); + msgHandler.stop(); + std::ostringstream errMsg; + errMsg << "ExeMgr: error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " + << ex.what(); + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + if (tableOID == 100 && msgHandler.aborted()) + { + /* TODO: modularize the cleanup code, as well as + * the rest of this fcn */ + + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + fIos.close(); + return; + } + + // std::cout << "connection drop\n"; + throw std::runtime_error(errMsg.str()); } - } - catch (std::exception& ex) - { - msgHandler.stop(); - std::ostringstream errMsg; - errMsg << "ExeMgr: error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " - << ex.what(); - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - if (tableOID == 100 && msgHandler.aborted()) + catch (...) { - /* TODO: modularize the cleanup code, as well as - * the rest of this fcn */ + std::ostringstream errMsg; + msgHandler.stop(); + errMsg << "ExeMgr: unknown error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; + jl->abort(); - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - fIos.close(); - return; + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + throw std::runtime_error(errMsg.str()); } - - // std::cout << "connection drop\n"; - throw std::runtime_error(errMsg.str()); } - catch (...) - { - std::ostringstream errMsg; - msgHandler.stop(); - errMsg << "ExeMgr: unknown error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - throw std::runtime_error(errMsg.str()); - } - totalRowCount += rowCount; totalBytesSent += bs.length();