diff --git a/dbcon/execplan/calpontsystemcatalog.h b/dbcon/execplan/calpontsystemcatalog.h index 13df9a095..fa2a061ed 100644 --- a/dbcon/execplan/calpontsystemcatalog.h +++ b/dbcon/execplan/calpontsystemcatalog.h @@ -970,6 +970,15 @@ inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type) execplan::CalpontSystemCatalog::TEXT == type); } +/** convenience function to determine if column type is BLOB or VARBINARY + */ +inline bool isBlobOrVarbinary(const execplan::CalpontSystemCatalog::ColDataType type) +{ + return (execplan::CalpontSystemCatalog::BLOB == type || + execplan::CalpontSystemCatalog::VARBINARY == type); +} + + /** convenience function to determine if column type is a * numeric type */ diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index 084f4f75d..4843e5522 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -884,93 +884,6 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca break; } - - case CalpontSystemCatalog::CHAR: - { - uint32_t colWidthInBytes = ci.columnTypes[colpos].colWidth; - - if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) - { - fprintf(ci.filePtr, "%c", ci.delimiter); - } - else - { - if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH) - { - // Pad to the full length of the field - escape.assign((char*)buf, colWidthInBytes); - - boost::replace_all(escape, "\\", "\\\\"); - - fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), - escape.c_str(), ci.enclosed_by, ci.delimiter); - } - else - { - // Get the actual data length - Field* field = table->field[colpos]; - bitmap_set_bit(table->read_set, field->field_index); - String attribute; - field->val_str(&attribute); - - escape.assign((char*)buf, attribute.length()); - boost::replace_all(escape, "\\", "\\\\"); - - fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), - escape.c_str(), ci.enclosed_by, ci.delimiter); - } - } - - buf += colWidthInBytes; - - break; - } - - case CalpontSystemCatalog::VARCHAR: - { - uint32_t colWidthInBytes = ci.columnTypes[colpos].colWidth; - - if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) - { - fprintf(ci.filePtr, "%c", ci.delimiter); - - if (colWidthInBytes < 256) - { - buf++; - } - else - { - buf = buf + 2 ; - } - } - else - { - // Maximum number of bytes allowed for a VARCHAR - // field is 65532, so the max length fits in 2 bytes. - // dataLength is length in bytes, not length in chars - uint16_t dataLength = 0; - - if (colWidthInBytes < 256) - { - dataLength = *(uint8_t*) buf; - buf++; - } - else - { - dataLength = *(uint16_t*) buf; - buf = buf + 2 ; - } - - escape.assign((char*)buf, dataLength); - boost::replace_all(escape, "\\", "\\\\"); - fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter); - } - - buf += colWidthInBytes; - - break; - } - case CalpontSystemCatalog::BIGINT: { if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) @@ -1659,134 +1572,43 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca break; } + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::VARBINARY: { - // For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes + Field* fieldPtr = table->field[colpos]; if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) { fprintf(ci.filePtr, "%c", ci.delimiter); - - if (ci.columnTypes[colpos].colWidth < 256) + } + else + { + String value; + // We need to set table->read_set for a field first. + // This happens in ha_mcs_impl_start_bulk_insert(). + fieldPtr->val_str(&value); + if (UNLIKELY(execplan::isBlobOrVarbinary(ci.columnTypes[colpos].colDataType))) { - buf++; + const char* ptr = value.ptr(); + for (uint32_t i = 0; i < value.length(); i++) + { + fprintf(ci.filePtr, "%02x", *(uint8_t*)ptr+i); + } + fprintf(ci.filePtr, "%c", ci.delimiter); } else { - buf = buf + 2; + escape.assign(value.ptr(), value.length()); + boost::replace_all(escape, "\\", "\\\\"); + fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), + escape.c_str(), ci.enclosed_by, ci.delimiter); } } - else - { - // Maximum number of bytes allowed for a VARBINARY - // field is 65532, so the max length fits in 2 bytes. - // dataLength is length in bytes, not length in chars - uint16_t dataLength = 0; - - if (ci.columnTypes[colpos].colWidth < 256) - { - dataLength = *(uint8_t*) buf; - buf++; - } - else - { - dataLength = *(uint16_t*) buf; - buf = buf + 2 ; - } - - const uchar* tmpBuf = buf; - - for (int32_t i = 0; i < dataLength; i++) - { - fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf); - tmpBuf++; - } - - fprintf(ci.filePtr, "%c", ci.delimiter); - } - - buf += ci.columnTypes[colpos].colWidth; - + buf+= fieldPtr->pack_length(); break; } - - case CalpontSystemCatalog::BLOB: - case CalpontSystemCatalog::TEXT: - { - // MCOL-4005 Note that we don't handle nulls as a special - // case here as we do for other datatypes, the below works - // as expected for nulls. - // dataLength is length in bytes, not length in chars - uint32_t dataLength = 0; - uintptr_t* dataptr; - uchar* ucharptr; - uint32_t colWidthInBytes; - - bool isBlob = ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB; - - if (isBlob) - { - colWidthInBytes = ci.columnTypes[colpos].colWidth; - } - else - { - // For TEXT fields, MDB sets char_length to the maximum number that will fit in the number of bytes the - // defined TEXT length will fit in. - // Ex: - // TEXT(25) will have char_length() == 255; - // TEXT(200) for latin_1 will have char_length() = 255 - // TEXT(200) for udf8mb4 will have char_length() = 65535 - // the length 200 multiplied by mbmaxlen (4) is > 255, so it needs 2 bytes for length. MDB sets to max(uint_16t) - colWidthInBytes = table->field[colpos]->char_length(); - } - - if (colWidthInBytes < 256) - { - dataLength = *(uint8_t*) buf; - buf++; - } - else if (colWidthInBytes < 65536) - { - dataLength = *(uint16_t*) buf; - buf += 2; - } - else if (colWidthInBytes < 16777216) - { - dataLength = *(uint16_t*) buf; - dataLength |= ((int) buf[2]) << 16; - buf += 3; - } - else - { - dataLength = *(uint32_t*) buf; - buf += 4; - } - - // buf contains pointer to blob, for example: - // (gdb) p (char*)*(uintptr_t*)buf - // $43 = 0x7f68500c58f8 "hello world" - dataptr = (uintptr_t*)buf; - ucharptr = (uchar*)*dataptr; - buf += sizeof(uintptr_t); - - if (isBlob) - { - for (uint32_t i = 0; i < dataLength; i++) - { - fprintf(ci.filePtr, "%02x", *(uint8_t*)ucharptr); - ucharptr++; - } - fprintf(ci.filePtr, "%c", ci.delimiter); - } - else - { - escape.assign((char*)ucharptr, dataLength); - boost::replace_all(escape, "\\", "\\\\"); - fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter); - } - - break; - } - default: // treat as int64 { break; diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 2f8cbe220..622636ccb 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -3539,7 +3539,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins ci->fdt[0] = -1; // now we can send all the data thru FIFO[1], writer of PARENT } - + // Set read_set used for bulk insertion of Fields inheriting + //from Field_blob|Field_varstring + bitmap_set_all(table->read_set); #endif } else @@ -3600,6 +3602,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!"); ci->rc = rc; ci->singleInsert = true; + bitmap_clear_all(table->read_set); return; } @@ -3609,6 +3612,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins if (stateFlags & SessionManagerServer::SS_SUSPENDED) { setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled."); + bitmap_clear_all(table->read_set); return; } @@ -3624,10 +3628,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins catch (IDBExcept& ie) { setError(thd, ER_INTERNAL_ERROR, ie.what()); -// setError(thd, ER_UNKNOWN_TABLE, ie.what()); + bitmap_clear_all(table->read_set); } catch (std::exception& ex) { + bitmap_clear_all(table->read_set); setError(thd, ER_INTERNAL_ERROR, logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what()); } @@ -3641,6 +3646,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) { + // Clear read_set used for bulk insertion of Fields inheriting + //from Field_blob|Field_varstring + bitmap_clear_all(table->read_set); THD* thd = current_thd; std::string aTmpDir(startup::StartUp::tmpDir()); @@ -3670,7 +3678,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table) ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) ) { #ifdef _MSC_VER - if (thd->killed > 0) { errno = 0;