From fb5ba84212fb3250d0e48008265bae72d334970b Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 6 Jul 2021 17:14:49 +0000 Subject: [PATCH] MCOL-4802 Removed ByteStream methods for bool manipulations and add some logging into I_S.columnstore_files --- dbcon/execplan/calpontsystemcatalog.h | 7 ++- dbcon/joblist/batchprimitiveprocessor-jl.cpp | 2 +- dbcon/mysql/ha_mcs_impl.cpp | 49 +++++------------ dbcon/mysql/ha_mcs_logging.h | 53 +++++++++++++++++++ dbcon/mysql/is_columnstore_files.cpp | 12 ++++- .../primproc/batchprimitiveprocessor.cpp | 3 +- utils/messageqcpp/bytestream.cpp | 25 --------- utils/messageqcpp/bytestream.h | 3 -- 8 files changed, 81 insertions(+), 73 deletions(-) create mode 100644 dbcon/mysql/ha_mcs_logging.h diff --git a/dbcon/execplan/calpontsystemcatalog.h b/dbcon/execplan/calpontsystemcatalog.h index c3ab5d719..4956e3801 100644 --- a/dbcon/execplan/calpontsystemcatalog.h +++ b/dbcon/execplan/calpontsystemcatalog.h @@ -920,13 +920,12 @@ public: bs << (uint8_t) scale; bs << (uint8_t) compressionType; bs << (uint32_t) charsetNumber; - bs << (bool) mIsDict; + bs << (uint8_t) mIsDict; } void unserialize(messageqcpp::ByteStream& bs) { uint8_t tmp8; uint32_t tmp32; - bool tmpBool; bs >> tmp8; colDataType = (execplan::CalpontSystemCatalog::ColDataType) tmp8; bs >> tmp8; @@ -938,8 +937,8 @@ public: bs >> tmp32; charsetNumber = tmp32; mCharset = nullptr; - bs >> tmpBool; - mIsDict = tmpBool; + bs >> tmp8; + mIsDict = static_cast(tmp8); } }; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 9d3aba526..edf6c4a84 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1127,7 +1127,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const // MCOL-4173 Notify PP if smallSide and largeSide have different column widths // and send smallSide RG to PP. bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn(); - bs << joinHasSkewedKeyColumn; + bs << (uint8_t) joinHasSkewedKeyColumn; if (!smallSideRGSent && joinHasSkewedKeyColumn) { idbassert(!smallSideRGs.empty()); diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 728429e29..4dff702ab 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -144,6 +144,7 @@ using namespace funcexp; #include "ha_mcs_datatype.h" #include "statistics.h" +#include "ha_mcs_logging.h" namespace cal_impl_if { @@ -179,32 +180,6 @@ inline uint32_t tid2sid(const uint32_t tid) return CalpontSystemCatalog::idb_tid2sid(tid); } - -/** - @brief - Wrapper around logging facility. - - @details - Reduces the boiler plate code. - - Called from number of places(mostly DML) in - ha_mcs_impl.cpp(). -*/ -void log_this(THD *thd, const char *message, - logging::LOG_TYPE log_type, unsigned sid) -{ - // corresponds with dbcon in SubsystemID vector - // in messagelog.cpp - unsigned int subSystemId = 24; - logging::LoggingID logid( subSystemId, sid, 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add(message); - msg.format( args1 ); - Logger logger(logid.fSubsysID); - logger.logMessage(log_type, msg, logid); -} - /** @brief Forcely close a FEP connection. @@ -247,7 +222,7 @@ void force_close_fep_conn(THD *thd, cal_connection_info* ci, bool check_prev_rc catch (...) { // Add details into the message. - log_this(thd, "Exception in force_close_fep_conn().", + ha_mcs_impl::log_this(thd, "Exception in force_close_fep_conn().", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); } @@ -3129,7 +3104,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins { ostringstream oss; oss << "Start SQL statement: " << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|"; - log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); } //start process cpimport mode 1 @@ -3168,7 +3143,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins { setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM)); ci->singleInsert = true; - log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, + ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } @@ -3325,8 +3300,8 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins errnum << "); " << errmsg; setError(current_thd, ER_INTERNAL_ERROR, oss.str()); ci->singleInsert = true; - log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id)); - log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } @@ -3347,7 +3322,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins errnum << "); " << strerror(errnum); setError(current_thd, ER_INTERNAL_ERROR, oss.str()); ci->singleInsert = true; - log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } @@ -3363,7 +3338,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins errnum << "); " << strerror(errnum); setError(current_thd, ER_INTERNAL_ERROR, oss.str()); ci->singleInsert = true; - log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); return; } else if (aChPid == 0) // we are in child @@ -3400,7 +3375,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed."); ci->singleInsert = true; - log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, + ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); exit(1); } @@ -3567,7 +3542,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL); ostringstream oss; oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg; - log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0); + ha_mcs_impl::log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0); } // Close handles to the cpimport process and its primary thread. @@ -3673,11 +3648,11 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) #endif if ( rc == 0) { - log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); } else { - log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); + ha_mcs_impl::log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id)); } ci->columnTypes.clear(); diff --git a/dbcon/mysql/ha_mcs_logging.h b/dbcon/mysql/ha_mcs_logging.h new file mode 100644 index 000000000..f3bcf108d --- /dev/null +++ b/dbcon/mysql/ha_mcs_logging.h @@ -0,0 +1,53 @@ +/* Copyright (C) 2021 MariaDB Corporation + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; version 2 of + * the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301, USA. + */ + +#ifndef HA_MCS_LOGGING_H__ +#define HA_MCS_LOGGING_H__ + +#include "messagelog.h" + +/** + @brief + Wrapper around logging facility. + + @details + Reduces the boiler plate code. + + Called from number of places(mostly DML) in + ha_mcs_impl.cpp(). +*/ +namespace ha_mcs_impl +{ +inline void log_this(THD *thd, const char *message, + logging::LOG_TYPE log_type, unsigned sid) +{ + // corresponds with dbcon in SubsystemID vector + // in messagelog.cpp + unsigned int subSystemId = 24; + logging::LoggingID logid( subSystemId, sid, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add(message); + msg.format( args1 ); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(log_type, msg, logid); +} + +}; + +#endif diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index bbaa5293f..91cb4e83f 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -37,6 +37,7 @@ #include "messagequeuepool.h" #include "we_messages.h" #include "is_columnstore.h" +#include "ha_mcs_logging.h" // Required declaration as it isn't in a MairaDB include bool schema_table_store_record(THD* thd, TABLE* table); @@ -52,7 +53,7 @@ ST_FIELD_INFO is_columnstore_files_fields[] = Show::CEnd() }; -static bool get_file_sizes(messageqcpp::MessageQueueClient* msgQueueClient, const char* fileName, off_t* fileSize, off_t* compressedFileSize) +static bool get_file_sizes(THD* thd, messageqcpp::MessageQueueClient* msgQueueClient, const char* fileName, off_t* fileSize, off_t* compressedFileSize) { messageqcpp::ByteStream bs; messageqcpp::ByteStream::byte rc; @@ -76,6 +77,13 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient* msgQueueClient, cons *sbs >> rc; *sbs >> errMsg; + if (rc) + { + ha_mcs_impl::log_this(thd, + "I_S::COLUMNSTORE_FILE::get_file_sizes(): WriteEngineServer returns an error().", + logging::LOG_TYPE_ERROR, thd->thread_id); + return false; + } *sbs >> *fileSize; *sbs >> *compressedFileSize; return true; @@ -161,7 +169,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th // snprintf output truncation check if (rc == WriteEngine::FILE_NAME_SIZE || - !get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) + !get_file_sizes(thd, msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) { messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); delete emp; diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index bfa10fbe1..0ca77129c 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -364,7 +364,8 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) { deserializeVector(bs, tlLargeSideKeyColumns[i]); bs >> tlSmallSideKeyLengths[i]; - bs >> mJOINHasSkewedKeyColumn; + bs >> tmp8; + mJOINHasSkewedKeyColumn = (bool) tmp8; // Deser smallSideRG if key data types are different, e.g. INT vs wide-DECIMAL. if (mJOINHasSkewedKeyColumn && !smallSideRGRecvd) { diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index 1b898aa0b..bef473b1c 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -170,14 +170,6 @@ ByteStream& ByteStream::operator<<(const uint8_t b) return *this; } -ByteStream& ByteStream::operator<<(const bool b) -{ - add(b); - - return *this; -} - - ByteStream& ByteStream::operator<<(const int16_t d) { if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead)) @@ -304,14 +296,6 @@ ByteStream& ByteStream::operator>>(uint8_t& b) return *this; } -ByteStream& ByteStream::operator>>(bool& b) -{ - peek(b); - fCurOutPtr++; - return *this; -} - - ByteStream& ByteStream::operator>>(int16_t& d) { peek(d); @@ -398,15 +382,6 @@ void ByteStream::peek(uint8_t& b) const b = *((int8_t*)fCurOutPtr); } -void ByteStream::peek(bool& b) const -{ - if (length() < 1) - throw underflow_error("ByteStream::peek(bool): not enough data in stream to fill datatype"); - - b = *((bool*)fCurOutPtr); -} - - void ByteStream::peek(int16_t& d) const { if (length() < 2) diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index 4cf1b28c1..970ff9423 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -113,7 +113,6 @@ public: * push a uint8_t onto the end of the stream */ EXPORT ByteStream& operator<<(const uint8_t b); - EXPORT ByteStream& operator<<(const bool b); /** * push a int16_t onto the end of the stream. The byte order is whatever the native byte order is. */ @@ -196,7 +195,6 @@ public: * extract a uint8_t from the front of the stream. */ EXPORT ByteStream& operator>>(uint8_t& b); - EXPORT ByteStream& operator>>(bool& b); /** * extract a int16_t from the front of the stream. The byte order is whatever the native byte order is. */ @@ -275,7 +273,6 @@ public: * Peek at a uint8_t from the front of the stream. */ EXPORT void peek(uint8_t& b) const; - EXPORT void peek(bool& b) const; /** * Peek at a int16_t from the front of the stream. The byte order is whatever the native byte order is. */