1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4802 Removed ByteStream methods for bool manipulations and add some logging into I_S.columnstore_files

This commit is contained in:
Roman Nozdrin
2021-07-06 17:14:49 +00:00
committed by Roman Nozdrin
parent b9bd207d3b
commit fb5ba84212
8 changed files with 81 additions and 73 deletions

View File

@ -920,13 +920,12 @@ public:
bs << (uint8_t) scale;
bs << (uint8_t) compressionType;
bs << (uint32_t) charsetNumber;
bs << (bool) mIsDict;
bs << (uint8_t) mIsDict;
}
void unserialize(messageqcpp::ByteStream& bs)
{
uint8_t tmp8;
uint32_t tmp32;
bool tmpBool;
bs >> tmp8;
colDataType = (execplan::CalpontSystemCatalog::ColDataType) tmp8;
bs >> tmp8;
@ -938,8 +937,8 @@ public:
bs >> tmp32;
charsetNumber = tmp32;
mCharset = nullptr;
bs >> tmpBool;
mIsDict = tmpBool;
bs >> tmp8;
mIsDict = static_cast<bool>(tmp8);
}
};

View File

@ -1127,7 +1127,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
// MCOL-4173 Notify PP if smallSide and largeSide have different column widths
// and send smallSide RG to PP.
bool joinHasSkewedKeyColumn = tJoiners[i]->joinHasSkewedKeyColumn();
bs << joinHasSkewedKeyColumn;
bs << (uint8_t) joinHasSkewedKeyColumn;
if (!smallSideRGSent && joinHasSkewedKeyColumn)
{
idbassert(!smallSideRGs.empty());

View File

@ -144,6 +144,7 @@ using namespace funcexp;
#include "ha_mcs_datatype.h"
#include "statistics.h"
#include "ha_mcs_logging.h"
namespace cal_impl_if
{
@ -179,32 +180,6 @@ inline uint32_t tid2sid(const uint32_t tid)
return CalpontSystemCatalog::idb_tid2sid(tid);
}
/**
@brief
Wrapper around logging facility.
@details
Reduces the boiler plate code.
Called from number of places(mostly DML) in
ha_mcs_impl.cpp().
*/
void log_this(THD *thd, const char *message,
logging::LOG_TYPE log_type, unsigned sid)
{
// corresponds with dbcon in SubsystemID vector
// in messagelog.cpp
unsigned int subSystemId = 24;
logging::LoggingID logid( subSystemId, sid, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add(message);
msg.format( args1 );
Logger logger(logid.fSubsysID);
logger.logMessage(log_type, msg, logid);
}
/**
@brief
Forcely close a FEP connection.
@ -247,7 +222,7 @@ void force_close_fep_conn(THD *thd, cal_connection_info* ci, bool check_prev_rc
catch (...)
{
// Add details into the message.
log_this(thd, "Exception in force_close_fep_conn().",
ha_mcs_impl::log_this(thd, "Exception in force_close_fep_conn().",
logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
@ -3129,7 +3104,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
{
ostringstream oss;
oss << "Start SQL statement: " << idb_mysql_query_str(thd) << "; |" << table->s->db.str << "|";
log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
//start process cpimport mode 1
@ -3168,7 +3143,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
{
setError(current_thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_LOCAL_QUERY_UM));
ci->singleInsert = true;
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
tid2sid(thd->thread_id));
return;
}
@ -3325,8 +3300,8 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
errnum << "); " << errmsg;
setError(current_thd, ER_INTERNAL_ERROR, oss.str());
ci->singleInsert = true;
log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id));
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, oss.str(), logging::LOG_TYPE_ERROR, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
return;
}
@ -3347,7 +3322,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
errnum << "); " << strerror(errnum);
setError(current_thd, ER_INTERNAL_ERROR, oss.str());
ci->singleInsert = true;
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
return;
}
@ -3363,7 +3338,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
errnum << "); " << strerror(errnum);
setError(current_thd, ER_INTERNAL_ERROR, oss.str());
ci->singleInsert = true;
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
return;
}
else if (aChPid == 0) // we are in child
@ -3400,7 +3375,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
setError(current_thd, ER_INTERNAL_ERROR, "Forking process cpimport failed.");
ci->singleInsert = true;
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG,
tid2sid(thd->thread_id));
exit(1);
}
@ -3567,7 +3542,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errnum, 0, errmsg, 512, NULL);
ostringstream oss;
oss << "GenerateConsoleCtrlEvent: (errno-" << errnum << "); " << errmsg;
log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0);
ha_mcs_impl::log_this(thd, oss.str(), logging::LOG_TYPE_DEBUG,0);
}
// Close handles to the cpimport process and its primary thread.
@ -3673,11 +3648,11 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
#endif
if ( rc == 0)
{
log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, "End SQL statement", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
else
{
log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
ha_mcs_impl::log_this(thd, "End SQL statement with error", logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
ci->columnTypes.clear();

View File

@ -0,0 +1,53 @@
/* Copyright (C) 2021 MariaDB Corporation
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; version 2 of
* the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
#ifndef HA_MCS_LOGGING_H__
#define HA_MCS_LOGGING_H__
#include "messagelog.h"
/**
@brief
Wrapper around logging facility.
@details
Reduces the boiler plate code.
Called from number of places(mostly DML) in
ha_mcs_impl.cpp().
*/
namespace ha_mcs_impl
{
inline void log_this(THD *thd, const char *message,
logging::LOG_TYPE log_type, unsigned sid)
{
// corresponds with dbcon in SubsystemID vector
// in messagelog.cpp
unsigned int subSystemId = 24;
logging::LoggingID logid( subSystemId, sid, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add(message);
msg.format( args1 );
logging::Logger logger(logid.fSubsysID);
logger.logMessage(log_type, msg, logid);
}
};
#endif

View File

@ -37,6 +37,7 @@
#include "messagequeuepool.h"
#include "we_messages.h"
#include "is_columnstore.h"
#include "ha_mcs_logging.h"
// Required declaration as it isn't in a MairaDB include
bool schema_table_store_record(THD* thd, TABLE* table);
@ -52,7 +53,7 @@ ST_FIELD_INFO is_columnstore_files_fields[] =
Show::CEnd()
};
static bool get_file_sizes(messageqcpp::MessageQueueClient* msgQueueClient, const char* fileName, off_t* fileSize, off_t* compressedFileSize)
static bool get_file_sizes(THD* thd, messageqcpp::MessageQueueClient* msgQueueClient, const char* fileName, off_t* fileSize, off_t* compressedFileSize)
{
messageqcpp::ByteStream bs;
messageqcpp::ByteStream::byte rc;
@ -76,6 +77,13 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient* msgQueueClient, cons
*sbs >> rc;
*sbs >> errMsg;
if (rc)
{
ha_mcs_impl::log_this(thd,
"I_S::COLUMNSTORE_FILE::get_file_sizes(): WriteEngineServer returns an error().",
logging::LOG_TYPE_ERROR, thd->thread_id);
return false;
}
*sbs >> *fileSize;
*sbs >> *compressedFileSize;
return true;
@ -161,7 +169,7 @@ static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* th
// snprintf output truncation check
if (rc == WriteEngine::FILE_NAME_SIZE ||
!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize))
!get_file_sizes(thd, msgQueueClient, fullFileName, &fileSize, &compressedFileSize))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;

View File

@ -364,7 +364,8 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
{
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
bs >> tlSmallSideKeyLengths[i];
bs >> mJOINHasSkewedKeyColumn;
bs >> tmp8;
mJOINHasSkewedKeyColumn = (bool) tmp8;
// Deser smallSideRG if key data types are different, e.g. INT vs wide-DECIMAL.
if (mJOINHasSkewedKeyColumn && !smallSideRGRecvd)
{

View File

@ -170,14 +170,6 @@ ByteStream& ByteStream::operator<<(const uint8_t b)
return *this;
}
ByteStream& ByteStream::operator<<(const bool b)
{
add(b);
return *this;
}
ByteStream& ByteStream::operator<<(const int16_t d)
{
if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead))
@ -304,14 +296,6 @@ ByteStream& ByteStream::operator>>(uint8_t& b)
return *this;
}
ByteStream& ByteStream::operator>>(bool& b)
{
peek(b);
fCurOutPtr++;
return *this;
}
ByteStream& ByteStream::operator>>(int16_t& d)
{
peek(d);
@ -398,15 +382,6 @@ void ByteStream::peek(uint8_t& b) const
b = *((int8_t*)fCurOutPtr);
}
void ByteStream::peek(bool& b) const
{
if (length() < 1)
throw underflow_error("ByteStream::peek(bool): not enough data in stream to fill datatype");
b = *((bool*)fCurOutPtr);
}
void ByteStream::peek(int16_t& d) const
{
if (length() < 2)

View File

@ -113,7 +113,6 @@ public:
* push a uint8_t onto the end of the stream
*/
EXPORT ByteStream& operator<<(const uint8_t b);
EXPORT ByteStream& operator<<(const bool b);
/**
* push a int16_t onto the end of the stream. The byte order is whatever the native byte order is.
*/
@ -196,7 +195,6 @@ public:
* extract a uint8_t from the front of the stream.
*/
EXPORT ByteStream& operator>>(uint8_t& b);
EXPORT ByteStream& operator>>(bool& b);
/**
* extract a int16_t from the front of the stream. The byte order is whatever the native byte order is.
*/
@ -275,7 +273,6 @@ public:
* Peek at a uint8_t from the front of the stream.
*/
EXPORT void peek(uint8_t& b) const;
EXPORT void peek(bool& b) const;
/**
* Peek at a int16_t from the front of the stream. The byte order is whatever the native byte order is.
*/