diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 02164e239..ae1904dee 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -179,6 +179,82 @@ inline uint32_t tid2sid(const uint32_t tid) return CalpontSystemCatalog::idb_tid2sid(tid); } + +/** + @brief + Wrapper around logging facility. + + @details + Reduces the boiler plate code. + + Called from number of places(mostly DML) in + ha_calpont_impl.cpp(). +*/ +void log_this(THD *thd, const char *message, + logging::LOG_TYPE log_type, unsigned sid) +{ + // corresponds with dbcon in SubsystemID vector + // in messagelog.cpp + unsigned int subSystemId = 24; + logging::LoggingID logid( subSystemId, sid, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add(message); + msg.format( args1 ); + Logger logger(logid.fSubsysID); + logger.logMessage(log_type, msg, logid); +} + +/** + @brief + Forcely close a FEP connection. + + @details + Plugin code opens network connection with ExMgr to + get: + the result of meta-data queries + the result of DML or DQL query in any mode + statistics + This code allows to explicitly close the connection + if any error happens using a non-existing protocol + code 0. This causes ExeMgr loop to drop the + connection. + + Called from many places in ha_calpont_impl.cpp(). +*/ +void force_close_fep_conn(THD *thd, cal_connection_info* ci, bool check_prev_rc = false) +{ + if (!ci->cal_conn_hndl) + { + return; + } + + if(check_prev_rc && !ci->rc) + { + return; + } + + // send ExeMgr an unknown signal to force him to close + // the connection + ByteStream msg; + ByteStream::quadbyte qb = 0; + msg << qb; + + try + { + ci->cal_conn_hndl->exeMgr->write(msg); + } + catch (...) + { + // Add details into the message. + log_this(thd, "Exception in force_close_fep_conn().", + logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + } + + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; +} + void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct) { // unset null bit first @@ -2119,26 +2195,7 @@ int ha_calpont_impl_rnd_init(TABLE* table) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { - if (ci->cal_conn_hndl) - { - // send ExeMgr a signal before closing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // canceling query. ignore connection failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - } - + force_close_fep_conn(thd, ci); return 0; } @@ -2650,26 +2707,7 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { - if (ci->cal_conn_hndl) - { - // send ExeMgr a signal before cloing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // cancel query. ignore connection failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - } - + force_close_fep_conn(thd, ci); return 0; } @@ -2767,27 +2805,7 @@ int ha_calpont_impl_rnd_end(TABLE* table) if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) { - // @bug 4022. error handling for select part of dml - if (ci->cal_conn_hndl && ci->rc) - { - // send ExeMgr a signal before closing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // this is error handling, so ignore connection failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - return rc; - } + force_close_fep_conn(thd, ci, true); // checking prev command rc } if (!ci) @@ -2802,29 +2820,9 @@ int ha_calpont_impl_rnd_end(TABLE* table) (thd->lex)->sql_command != SQLCOM_INSERT_SELECT && thd->variables.select_limit != (uint64_t) - 1)) { - if (ci->cal_conn_hndl) - { - // send ExeMgr a signal before closing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // this is the end of query. Ignore the exception if exemgr connection failed - // for whatever reason. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - // clear querystats because no query stats available for cancelled query - ci->queryStats = ""; - } - + force_close_fep_conn(thd, ci); + // clear querystats because no query stats available for cancelled query + ci->queryStats = ""; return 0; } @@ -2906,6 +2904,7 @@ int ha_calpont_impl_rnd_end(TABLE* table) ci->warningMsg.clear(); // reset expressionId just in case ci->expressionId = 0; + return rc; } @@ -3243,16 +3242,11 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) } //Log the statement to debug.log - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("Start SQL statement: "); - ostringstream oss; - oss << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|"; - args1.add(oss.str()); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + { + ostringstream oss; + oss << "Start SQL statement: " << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|"; + log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + } //start process cpimport mode 1 ci->mysqld_pid = getpid(); @@ -3290,13 +3284,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) { setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM)); ci->singleInsert = true; - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("End SQL statement"); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, + tid2sid(thd->thread_id)); return; } else @@ -3470,16 +3459,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) errnum << "); " << errmsg; setError(current_thd, ER_INTERNAL_ERROR, oss.str()); ci->singleInsert = true; - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1, args2; - logging::Message emsg(1), msg(1); - args1.add(oss.str()); - emsg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_ERROR, emsg, logid); - args2.add("End SQL statement"); - msg.format( args2 ); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id)); + log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } @@ -3500,13 +3481,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) errnum << "); " << strerror(errnum); setError(current_thd, ER_INTERNAL_ERROR, oss.str()); ci->singleInsert = true; - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("End SQL statement"); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } @@ -3522,13 +3497,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) errnum << "); " << strerror(errnum); setError(current_thd, ER_INTERNAL_ERROR, oss.str()); ci->singleInsert = true; - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("End SQL statement"); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } else if (aChPid == 0) // we are in child @@ -3565,13 +3534,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed."); ci->singleInsert = true; - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add("End SQL statement"); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, + tid2sid(thd->thread_id)); exit(1); } else // parent @@ -3736,13 +3700,7 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL); ostringstream oss; oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg; - LoggingID logid( 24, 0, 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add(oss.str()); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0); } // Close handles to the cpimport process and its primary thread. @@ -3846,18 +3804,15 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) } #endif - LoggingID logid( 24, tid2sid(thd->thread_id), 0); - logging::Message::Args args1; - logging::Message msg(1); - if ( rc == 0) - args1.add("End SQL statement"); + { + log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + } else - args1.add("End SQL statement with error"); + { + log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + } - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); ci->columnTypes.clear(); //get extra warning count if any ifstream dmlFile; @@ -4183,26 +4138,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { - if (ci->cal_conn_hndl) - { - // send ExeMgr a signal before cloing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // this is the end of the query. Ignore connetion failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - } - + force_close_fep_conn(thd, ci); return 0; } @@ -4357,26 +4293,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { - if (ci->cal_conn_hndl) - { - // send ExeMgr a signal before closing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // canceling query. ignore connection failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - } - + force_close_fep_conn(thd, ci); return 0; } @@ -4851,26 +4768,7 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { - if (ci->cal_conn_hndl) - { - // send ExeMgr a signal before cloing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // cancel query. ignore connection failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - } - + force_close_fep_conn(thd, ci); return 0; } @@ -4932,7 +4830,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* THD* thd = current_thd; cal_connection_info* ci = NULL; - if (thd->slave_thread && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || @@ -4950,37 +4847,11 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* if (get_fe_conn_info_ptr() != NULL) ci = reinterpret_cast(get_fe_conn_info_ptr()); - // MCOL-1052 - //if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) - //{ - // thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;// flip back to normal state - // return rc; - //} - - if (((thd->lex)->sql_command == SQLCOM_INSERT) || + if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) { - // @bug 4022. error handling for select part of dml - if (ci->cal_conn_hndl && ci->rc) - { - // send ExeMgr a signal before closing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // this is error handling, so ignore connection failure. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - return rc; - } + force_close_fep_conn(thd, ci, true); // with checking prev command rc + return rc; } if (!ci) @@ -4995,33 +4866,15 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* (thd->lex)->sql_command != SQLCOM_INSERT_SELECT && thd->variables.select_limit != (uint64_t) - 1)) { - if (ci->cal_conn_hndl) + force_close_fep_conn(thd, ci); + // clear querystats because no query stats available for cancelled query + ci->queryStats = ""; + // Poping next ExeMgr connection out of the stack + if ( ci->cal_conn_hndl_st.size() ) { - // send ExeMgr a signal before closing the connection - ByteStream msg; - ByteStream::quadbyte qb = 0; - msg << qb; - - try - { - ci->cal_conn_hndl->exeMgr->write(msg); - } - catch (...) - { - // this is the end of query. Ignore the exception if exemgr connection failed - // for whatever reason. - } - - sm::sm_cleanup(ci->cal_conn_hndl); - ci->cal_conn_hndl = 0; - // clear querystats because no query stats available for cancelled query - ci->queryStats = ""; + ci->cal_conn_hndl_st.pop(); if ( ci->cal_conn_hndl_st.size() ) - { - ci->cal_conn_hndl_st.pop(); - if ( ci->cal_conn_hndl_st.size() ) - ci->cal_conn_hndl = ci->cal_conn_hndl_st.top(); - } + ci->cal_conn_hndl = ci->cal_conn_hndl_st.top(); } return 0;