From 8206fafb7c5b753bce8ba9028913d770cd678118 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Tue, 19 Jan 2021 10:37:58 +0000 Subject: [PATCH] MCOL-4496 Upmerged the commit 7fa5ca3a6 to generalise CHAR|VARCHAR|TEXT|BLOB bulk insertion processing and fix the original issue introduced by MCOL-2000 --- dbcon/mysql/ha_mcs_datatype.h | 247 +++++++++------------------------- dbcon/mysql/ha_mcs_impl.cpp | 11 +- 2 files changed, 71 insertions(+), 187 deletions(-) diff --git a/dbcon/mysql/ha_mcs_datatype.h b/dbcon/mysql/ha_mcs_datatype.h index cb753b454..5fbed26e5 100644 --- a/dbcon/mysql/ha_mcs_datatype.h +++ b/dbcon/mysql/ha_mcs_datatype.h @@ -185,7 +185,6 @@ public: /*******************************************************************************/ - class WriteBatchFieldMariaDB: public WriteBatchField { // Maximum number of decimal digits that can be represented in 4 bytes @@ -343,89 +342,76 @@ public: return m_field->pack_length(); } - size_t ColWriteBatchChar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override + static inline void ColWriteBatchTextStringPrintout(std::string& escape, + const ColBatchWriter &ci) + { + boost::replace_all(escape, "\\", "\\\\"); + fprintf(ci.filePtr(), "%c%.*s%c%c", ci.enclosed_by(), (int)escape.length(), + escape.c_str(), ci.enclosed_by(), ci.delimiter()); + } + + static void ColWriteBatchTextString(const String &value, + const ColBatchWriter &ci, + const size_t colWidthInBytes) + { + std::string escape; + escape.assign(value.ptr(), value.length()); + ColWriteBatchTextStringPrintout(escape, ci); + } + + static void ColWriteBatchPaddedTextString(const String &value, + const ColBatchWriter &ci, + const size_t colWidthInBytes) + { + std::string escape; + escape.assign(value.ptr(), colWidthInBytes); + ColWriteBatchTextStringPrintout(escape, ci); + } + + static void ColWriteBatchBlobString(const String &value, + const ColBatchWriter &ci, + const size_t colWidthInBytes) + { + const char* ptr = value.ptr(); + for (uint32_t i = 0; i < value.length(); i++) + { + fprintf(ci.filePtr(), "%02x", *(uint8_t*)ptr+i); + } + fprintf(ci.filePtr(), "%c", ci.delimiter()); + } + + size_t ColWriteBatchString(const uchar *buf, + bool nullVal, + ColBatchWriter &ci, + void (*printFuncPtr)(const String&, + const ColBatchWriter &, + const size_t colWidthInBytes)) const { - uint32_t colWidthInBytes = m_type.colWidth * m_mbmaxlen; if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) { fprintf(ci.filePtr(), "%c", ci.delimiter()); } else { - if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH) - { - std::string escape; - // Pad to the full length of the field - escape.assign((char*)buf, colWidthInBytes); - - boost::replace_all(escape, "\\", "\\\\"); - - fprintf(ci.filePtr(), "%c%.*s%c%c", - ci.enclosed_by(), - (int)escape.length(), escape.c_str(), - ci.enclosed_by(), ci.delimiter()); - } - else - { - std::string escape; - // Get the actual data length - bitmap_set_bit(m_field->table->read_set, m_field->field_index); - String attribute; - m_field->val_str(&attribute); - - escape.assign((char*)buf, attribute.length()); - boost::replace_all(escape, "\\", "\\\\"); - - fprintf(ci.filePtr(), "%c%.*s%c%c", - ci.enclosed_by(), - (int)escape.length(), escape.c_str(), - ci.enclosed_by(), ci.delimiter()); - } + String value; + // We need to set table->read_set for a field first. + // This happens in ha_mcs_impl_start_bulk_insert(). + m_field->val_str(&value); + (*printFuncPtr)(value, ci, m_field->pack_length()); } + return m_field->pack_length(); + } - return colWidthInBytes; + size_t ColWriteBatchChar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override + { + return (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH) ? + ColWriteBatchString(buf, nullVal, ci, &ColWriteBatchPaddedTextString) : + ColWriteBatchString(buf, nullVal, ci, &ColWriteBatchTextString); } size_t ColWriteBatchVarchar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override { - const uchar *buf0= buf; - uint32_t colWidthInBytes = m_type.colWidth * m_mbmaxlen; - if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) - { - fprintf(ci.filePtr(), "%c", ci.delimiter()); - if (colWidthInBytes < 256) - { - buf++; - } - else - { - buf = buf + 2 ; - } - } - else - { - int dataLength = 0; - - if (colWidthInBytes < 256) - { - dataLength = *(uint8_t*) buf; - buf++; - } - else - { - dataLength = *(uint16_t*) buf; - buf = buf + 2 ; - } - std::string escape; - escape.assign((char*)buf, dataLength); - boost::replace_all(escape, "\\", "\\\\"); - fprintf(ci.filePtr(), "%c%.*s%c%c", - ci.enclosed_by(), - (int)escape.length(), escape.c_str(), - ci.enclosed_by(), ci.delimiter()); - } - buf += colWidthInBytes; - return buf - buf0; + return ColWriteBatchString(buf, nullVal, ci, &ColWriteBatchTextString); } size_t ColWriteBatchSInt64(const uchar *buf, bool nullVal, ColBatchWriter &ci) override @@ -854,126 +840,15 @@ public: size_t ColWriteBatchVarbinary(const uchar *buf0, bool nullVal, ColBatchWriter &ci) override { - const uchar *buf= buf0; - if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) - { - fprintf(ci.filePtr(), "%c", ci.delimiter()); - - if (m_type.colWidth < 256) - { - buf++; - } - else - { - buf = buf + 2; - } - } - else - { - uint16_t dataLength = 0; - - if (m_type.colWidth < 256) - { - dataLength = *(int8_t*) buf; - buf++; - } - else - { - dataLength = *(int16_t*) buf; - buf = buf + 2 ; - } - - const uchar* tmpBuf = buf; - - for (int32_t i = 0; i < dataLength; i++) - { - fprintf(ci.filePtr(), "%02x", *(uint8_t*)tmpBuf); - tmpBuf++; - } - - fprintf(ci.filePtr(), "%c", ci.delimiter()); - - } - - return buf - buf0; + return ColWriteBatchString(buf0, nullVal, ci, &ColWriteBatchBlobString); } size_t ColWriteBatchBlob(const uchar *buf0, bool nullVal, ColBatchWriter &ci) override { - const uchar *buf= buf0; - // MCOL-4005 Note that we don't handle nulls as a special - // case here as we do for other datatypes, the below works - // as expected for nulls. - uint32_t dataLength = 0; - uintptr_t* dataptr; - uchar* ucharptr; - bool isBlob = m_type.colDataType == CalpontSystemCatalog::BLOB; - uint colWidthInBytes = isBlob ? m_type.colWidth : m_type.colWidth * m_mbmaxlen; - - if (!isBlob && m_field->char_length() == 65535) - { - // Special case for TEXT field without default length, - // such as: - // CREATE TABLE mcol4364 (a TEXT); - // Here, char_length() represents the number of bytes, - // not number of characters. - dataLength = *(uint16_t*) buf; - buf += 2; - } - else if (colWidthInBytes < 256) - { - dataLength = *(uint8_t*) buf; - buf++; - } - else if (colWidthInBytes < 65536) - { - dataLength = *(uint16_t*) buf; - buf += 2; - } - else if (colWidthInBytes < 16777216) - { - dataLength = *(uint16_t*) buf; - dataLength |= ((int) buf[2]) << 16; - buf += 3; - } - else - { - dataLength = *(uint32_t*) buf; - buf += 4; - } - - // buf contains pointer to blob, for example: - // (gdb) p (char*)*(uintptr_t*)buf - // $43 = 0x7f68500c58f8 "hello world" - - dataptr = (uintptr_t*)buf; - ucharptr = (uchar*)*dataptr; - buf += sizeof(uintptr_t); - - if (isBlob) - { - for (uint32_t i = 0; i < dataLength; i++) - { - fprintf(ci.filePtr(), "%02x", *(uint8_t*)ucharptr); - ucharptr++; - } - - fprintf(ci.filePtr(), "%c", ci.delimiter()); - } - else - { - // TEXT Column - std::string escape; - escape.assign((char*)ucharptr, dataLength); - boost::replace_all(escape, "\\", "\\\\"); - fprintf(ci.filePtr(), "%c%.*s%c%c", - ci.enclosed_by(), - (int)escape.length(), escape.c_str(), - ci.enclosed_by(), ci.delimiter()); - } - - return buf - buf0; + return (UNLIKELY(m_type.colDataType == CalpontSystemCatalog::BLOB)) ? + ColWriteBatchString(buf0, nullVal, ci, &ColWriteBatchBlobString) : + ColWriteBatchString(buf0, nullVal, ci, &ColWriteBatchTextString); } }; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index d1868f52d..9455885da 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3186,6 +3186,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->fdt[0] = -1; // now we can send all the data thru FIFO[1], writer of PARENT } + // Set read_set used for bulk insertion of Fields inheriting + //from Field_blob|Field_varstring. Used in ColWriteBatchString() + bitmap_set_all(table->read_set); #endif } @@ -3247,6 +3250,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!"); ci->rc = rc; ci->singleInsert = true; + bitmap_clear_all(table->read_set); return; } @@ -3256,6 +3260,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if (stateFlags & SessionManagerServer::SS_SUSPENDED) { setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled."); + bitmap_clear_all(table->read_set); return; } @@ -3270,11 +3275,12 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins } catch (IDBExcept& ie) { + bitmap_clear_all(table->read_set); setError(thd, ER_INTERNAL_ERROR, ie.what()); -// setError(thd, ER_UNKNOWN_TABLE, ie.what()); } catch (std::exception& ex) { + bitmap_clear_all(table->read_set); setError(thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what()); } @@ -3288,6 +3294,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) { + // Clear read_set used for bulk insertion of Fields inheriting + //from Field_blob|Field_varstring + bitmap_clear_all(table->read_set); THD* thd = current_thd; std::string aTmpDir(startup::StartUp::tmpDir());