From 53af74b0274df2863d866655abd5a4c646196ca6 Mon Sep 17 00:00:00 2001 From: "David.Hall" Date: Mon, 9 Jan 2023 13:59:26 -0600 Subject: [PATCH] MCOL-1170 Fix ANALYZE to not error (#2682) Analyze needs to be completed differently than a normal query. In server, when an ANALYZE is seen, it calls init_scan() immediatly followed by end_scan(). This leaves the sqlfrontendsession (ExeMgr) in a state where it expects to return rows. This patch fixes end_scan to clean this up via reads and writes to get everything back in synch. ANALYZE should display the number of rows to be displayed if the query were run normally. We have that information available, but no way to return it. A modification to server side to ask for that in the handler is required. This patch also includes a beautification of sqlfrontsessionthread.cpp since it looked bad. The important change is at line 774 if (!swallowRows) which short circuits the actual return of data --- .gitignore | 4 + dbcon/mysql/ha_mcs_impl.cpp | 23 ++++- primitives/primproc/sqlfrontsessionthread.cpp | 99 ++++++++++--------- 3 files changed, 79 insertions(+), 47 deletions(-) diff --git a/.gitignore b/.gitignore index 3ed72171a..1661c661a 100644 --- a/.gitignore +++ b/.gitignore @@ -177,3 +177,7 @@ build/Testing/ tests/*\[1\]_tests.cmake tests/*\[1\]_include.cmake .boost +*.vtg +*.vtg-back +'*.vtg-Stashed changes' + diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index e4bf08f04..53ffe0cb2 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -1989,6 +1989,7 @@ bool sendExecutionPlanToExeMgr(sm::cpsm_conhdl_t* hndl, ByteStream::quadbyte qb, } // namespace +// Called only for ANALYZE TABLE int ha_mcs_impl_analyze(THD* thd, TABLE* table) { uint32_t sessionID = execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id); @@ -2685,6 +2686,26 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand) ci = reinterpret_cast(get_fe_conn_info_ptr()); } + if (thd->lex->analyze_stmt && ci->cal_conn_hndl && ci->cal_conn_hndl->exeMgr) + { + // The ANALYZE statement leaves ExeMgr hanging. This clears it up. + ci->cal_conn_hndl->exeMgr->read(); // Ignore the returned buffer + ByteStream msg; + ByteStream::quadbyte qb = 1; // Tell PrimProc front session to eat all the rows + msg << qb; + ci->cal_conn_hndl->exeMgr->write(msg); + // This is the command to start sending return values. because we previously sent the swallow + // rows command, there won't be anything useful coming back, but it needs this to flush internal queues. + qb = 5; // Read the result data. + msg.reset(); + msg << qb; + ci->cal_conn_hndl->exeMgr->write(msg); + qb = 0; // End the query + msg.reset(); + msg << qb; + ci->cal_conn_hndl->exeMgr->write(msg); + } + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { force_close_fep_conn(thd, ci); @@ -4813,7 +4834,7 @@ int ha_mcs_impl_group_by_end(TABLE* table) * Execute the query and saves derived table query. * There is an extra handler argument so I ended up with a * new init function. The code is a copy of - * ha_mcs_impl_rnd_init() mostly. + * impl_rnd_init() mostly. * PARAMETERS: * mcs_handler_info* pnt to an envelope struct * TABLE* table - dest table to put the results into diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index 48bb9577a..5b43746f0 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -420,6 +420,11 @@ void SQLFrontSessionThread::operator()() analyzeTableHandleStats(bs); continue; } + else if (qb == 0) + { + // 0 => Nothing left to do. Sent by rnd_end() just to be sure. + continue; + } else { if (gDebug) @@ -765,62 +770,64 @@ void SQLFrontSessionThread::operator()() bs << errInfo->errorMsg(jl->status()); } - try // @bug2244: try/catch around fIos.write() calls projecting rows + if (!swallowRows) { - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + try // @bug2244: try/catch around fIos.write() calls projecting rows { - // Skip the write to the front end until the last empty band. Used to time queries - // through without any front end waiting. - if (tableOID < 3000 || rowCount == 0) + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + // Skip the write to the front end until the last empty band. Used to time queries + // through without any front end waiting. + if (tableOID < 3000 || rowCount == 0) + fIos.write(bs); + } + else + { fIos.write(bs); + } } - else + catch (std::exception& ex) { - fIos.write(bs); + msgHandler.stop(); + std::ostringstream errMsg; + errMsg << "ExeMgr: error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " + << ex.what(); + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + if (tableOID == 100 && msgHandler.aborted()) + { + /* TODO: modularize the cleanup code, as well as + * the rest of this fcn */ + + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + fIos.close(); + return; + } + + // std::cout << "connection drop\n"; + throw std::runtime_error(errMsg.str()); } - } - catch (std::exception& ex) - { - msgHandler.stop(); - std::ostringstream errMsg; - errMsg << "ExeMgr: error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " - << ex.what(); - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - if (tableOID == 100 && msgHandler.aborted()) + catch (...) { - /* TODO: modularize the cleanup code, as well as - * the rest of this fcn */ + std::ostringstream errMsg; + msgHandler.stop(); + errMsg << "ExeMgr: unknown error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; + jl->abort(); - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - fIos.close(); - return; + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + throw std::runtime_error(errMsg.str()); } - - // std::cout << "connection drop\n"; - throw std::runtime_error(errMsg.str()); } - catch (...) - { - std::ostringstream errMsg; - msgHandler.stop(); - errMsg << "ExeMgr: unknown error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - throw std::runtime_error(errMsg.str()); - } - totalRowCount += rowCount; totalBytesSent += bs.length();