1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-3247 Add two utility functions to simplify the code:

to log the messages and to forcely close the FEP connection.
This commit is contained in:
Roman Nozdrin
2019-04-15 12:37:04 +03:00
parent 11bec446b4
commit 34b1d44563

View File

@ -179,6 +179,82 @@ inline uint32_t tid2sid(const uint32_t tid)
return CalpontSystemCatalog::idb_tid2sid(tid); return CalpontSystemCatalog::idb_tid2sid(tid);
} }
/**
@brief
Wrapper around logging facility.
@details
Reduces the boiler plate code.
Called from number of places(mostly DML) in
ha_calpont_impl.cpp().
*/
void log_this(THD *thd, const char *message,
logging::LOG_TYPE log_type, unsigned sid)
{
// corresponds with dbcon in SubsystemID vector
// in messagelog.cpp
unsigned int subSystemId = 24;
logging::LoggingID logid( subSystemId, sid, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add(message);
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(log_type, msg, logid);
}
/**
@brief
Forcely close a FEP connection.
@details
Plugin code opens network connection with ExMgr to
get:
the result of meta-data queries
the result of DML or DQL query in any mode
statistics
This code allows to explicitly close the connection
if any error happens using a non-existing protocol
code 0. This causes ExeMgr loop to drop the
connection.
Called from many places in ha_calpont_impl.cpp().
*/
void force_close_fep_conn(THD *thd, cal_connection_info* ci, bool check_prev_rc = false)
{
if (!ci->cal_conn_hndl)
{
return;
}
if(check_prev_rc && !ci->rc)
{
return;
}
// send ExeMgr an unknown signal to force him to close
// the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// Add details into the message.
log_this(thd, "Exception in force_close_fep_conn().",
logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct) void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
{ {
// unset null bit first // unset null bit first
@ -2119,26 +2195,7 @@ int ha_calpont_impl_rnd_init(TABLE* table)
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// canceling query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0; return 0;
} }
@ -2650,26 +2707,7 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
// @bug 3078 // @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
{
// send ExeMgr a signal before cloing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// cancel query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0; return 0;
} }
@ -2767,27 +2805,7 @@ int ha_calpont_impl_rnd_end(TABLE* table)
if (((thd->lex)->sql_command == SQLCOM_INSERT) || if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
{ {
// @bug 4022. error handling for select part of dml force_close_fep_conn(thd, ci, true); // checking prev command rc
if (ci->cal_conn_hndl && ci->rc)
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is error handling, so ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
return rc;
}
} }
if (!ci) if (!ci)
@ -2802,29 +2820,9 @@ int ha_calpont_impl_rnd_end(TABLE* table)
(thd->lex)->sql_command != SQLCOM_INSERT_SELECT && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT &&
thd->variables.select_limit != (uint64_t) - 1)) thd->variables.select_limit != (uint64_t) - 1))
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
{ // clear querystats because no query stats available for cancelled query
// send ExeMgr a signal before closing the connection ci->queryStats = "";
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is the end of query. Ignore the exception if exemgr connection failed
// for whatever reason.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
}
return 0; return 0;
} }
@ -2906,6 +2904,7 @@ int ha_calpont_impl_rnd_end(TABLE* table)
ci->warningMsg.clear(); ci->warningMsg.clear();
// reset expressionId just in case // reset expressionId just in case
ci->expressionId = 0; ci->expressionId = 0;
return rc; return rc;
} }
@ -3243,16 +3242,11 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
} }
//Log the statement to debug.log //Log the statement to debug.log
LoggingID logid( 24, tid2sid(thd->thread_id), 0); {
logging::Message::Args args1; ostringstream oss;
logging::Message msg(1); oss << "Start SQL statement: " << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|";
args1.add("Start SQL statement: "); log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ostringstream oss; }
oss << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|";
args1.add(oss.str());
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
//start process cpimport mode 1 //start process cpimport mode 1
ci->mysqld_pid = getpid(); ci->mysqld_pid = getpid();
@ -3290,13 +3284,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
{ {
setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM)); setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM));
ci->singleInsert = true; ci->singleInsert = true;
LoggingID logid( 24, tid2sid(thd->thread_id), 0); log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
logging::Message::Args args1; tid2sid(thd->thread_id));
logging::Message msg(1);
args1.add("End SQL statement");
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
return; return;
} }
else else
@ -3470,16 +3459,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
errnum << "); " << errmsg; errnum << "); " << errmsg;
setError(current_thd, ER_INTERNAL_ERROR, oss.str()); setError(current_thd, ER_INTERNAL_ERROR, oss.str());
ci->singleInsert = true; ci->singleInsert = true;
LoggingID logid( 24, tid2sid(thd->thread_id), 0); log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id));
logging::Message::Args args1, args2; log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
logging::Message emsg(1), msg(1);
args1.add(oss.str());
emsg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_ERROR, emsg, logid);
args2.add("End SQL statement");
msg.format( args2 );
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
return; return;
} }
@ -3500,13 +3481,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
errnum << "); " << strerror(errnum); errnum << "); " << strerror(errnum);
setError(current_thd, ER_INTERNAL_ERROR, oss.str()); setError(current_thd, ER_INTERNAL_ERROR, oss.str());
ci->singleInsert = true; ci->singleInsert = true;
LoggingID logid( 24, tid2sid(thd->thread_id), 0); log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
logging::Message::Args args1;
logging::Message msg(1);
args1.add("End SQL statement");
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
return; return;
} }
@ -3522,13 +3497,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
errnum << "); " << strerror(errnum); errnum << "); " << strerror(errnum);
setError(current_thd, ER_INTERNAL_ERROR, oss.str()); setError(current_thd, ER_INTERNAL_ERROR, oss.str());
ci->singleInsert = true; ci->singleInsert = true;
LoggingID logid( 24, tid2sid(thd->thread_id), 0); log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
logging::Message::Args args1;
logging::Message msg(1);
args1.add("End SQL statement");
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
return; return;
} }
else if (aChPid == 0) // we are in child else if (aChPid == 0) // we are in child
@ -3565,13 +3534,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed."); setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed.");
ci->singleInsert = true; ci->singleInsert = true;
LoggingID logid( 24, tid2sid(thd->thread_id), 0); log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
logging::Message::Args args1; tid2sid(thd->thread_id));
logging::Message msg(1);
args1.add("End SQL statement");
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
exit(1); exit(1);
} }
else // parent else // parent
@ -3736,13 +3700,7 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table)
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL); FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL);
ostringstream oss; ostringstream oss;
oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg; oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg;
LoggingID logid( 24, 0, 0); log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add(oss.str());
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
} }
// Close handles to the cpimport process and its primary thread. // Close handles to the cpimport process and its primary thread.
@ -3846,18 +3804,15 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table)
} }
#endif #endif
LoggingID logid( 24, tid2sid(thd->thread_id), 0);
logging::Message::Args args1;
logging::Message msg(1);
if ( rc == 0) if ( rc == 0)
args1.add("End SQL statement"); {
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
else else
args1.add("End SQL statement with error"); {
log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
ci->columnTypes.clear(); ci->columnTypes.clear();
//get extra warning count if any //get extra warning count if any
ifstream dmlFile; ifstream dmlFile;
@ -4183,26 +4138,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type)
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
{
// send ExeMgr a signal before cloing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is the end of the query. Ignore connetion failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0; return 0;
} }
@ -4357,26 +4293,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// canceling query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0; return 0;
} }
@ -4851,26 +4768,7 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
// @bug 3078 // @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
{
// send ExeMgr a signal before cloing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// cancel query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0; return 0;
} }
@ -4932,7 +4830,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
THD* thd = current_thd; THD* thd = current_thd;
cal_connection_info* ci = NULL; cal_connection_info* ci = NULL;
if (thd->slave_thread && ( if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
@ -4950,37 +4847,11 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
if (get_fe_conn_info_ptr() != NULL) if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
// MCOL-1052 if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
//if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
//{
// thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;// flip back to normal state
// return rc;
//}
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
{ {
// @bug 4022. error handling for select part of dml force_close_fep_conn(thd, ci, true); // with checking prev command rc
if (ci->cal_conn_hndl && ci->rc) return rc;
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is error handling, so ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
return rc;
}
} }
if (!ci) if (!ci)
@ -4995,33 +4866,15 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
(thd->lex)->sql_command != SQLCOM_INSERT_SELECT && (thd->lex)->sql_command != SQLCOM_INSERT_SELECT &&
thd->variables.select_limit != (uint64_t) - 1)) thd->variables.select_limit != (uint64_t) - 1))
{ {
if (ci->cal_conn_hndl) force_close_fep_conn(thd, ci);
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
// Poping next ExeMgr connection out of the stack
if ( ci->cal_conn_hndl_st.size() )
{ {
// send ExeMgr a signal before closing the connection ci->cal_conn_hndl_st.pop();
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is the end of query. Ignore the exception if exemgr connection failed
// for whatever reason.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
if ( ci->cal_conn_hndl_st.size() ) if ( ci->cal_conn_hndl_st.size() )
{ ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
} }
return 0; return 0;