1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-01 06:21:41 +03:00

Leverage MDB Field methods to output char or binary-based types when processing batch inserts

This commit is contained in:
Roman Nozdrin
2020-12-03 14:17:36 +00:00
parent ef78ec9278
commit 7fa5ca3a6d
3 changed files with 43 additions and 205 deletions

View File

@ -970,6 +970,15 @@ inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type)
execplan::CalpontSystemCatalog::TEXT == type);
}
/** convenience function to determine if column type is BLOB or VARBINARY
*/
inline bool isBlobOrVarbinary(const execplan::CalpontSystemCatalog::ColDataType type)
{
return (execplan::CalpontSystemCatalog::BLOB == type ||
execplan::CalpontSystemCatalog::VARBINARY == type);
}
/** convenience function to determine if column type is a
* numeric type
*/

View File

@ -884,93 +884,6 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
break;
}
case CalpontSystemCatalog::CHAR:
{
uint32_t colWidthInBytes = ci.columnTypes[colpos].colWidth;
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr, "%c", ci.delimiter);
}
else
{
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
{
// Pad to the full length of the field
escape.assign((char*)buf, colWidthInBytes);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
escape.c_str(), ci.enclosed_by, ci.delimiter);
}
else
{
// Get the actual data length
Field* field = table->field[colpos];
bitmap_set_bit(table->read_set, field->field_index);
String attribute;
field->val_str(&attribute);
escape.assign((char*)buf, attribute.length());
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
escape.c_str(), ci.enclosed_by, ci.delimiter);
}
}
buf += colWidthInBytes;
break;
}
case CalpontSystemCatalog::VARCHAR:
{
uint32_t colWidthInBytes = ci.columnTypes[colpos].colWidth;
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr, "%c", ci.delimiter);
if (colWidthInBytes < 256)
{
buf++;
}
else
{
buf = buf + 2 ;
}
}
else
{
// Maximum number of bytes allowed for a VARCHAR
// field is 65532, so the max length fits in 2 bytes.
// dataLength is length in bytes, not length in chars
uint16_t dataLength = 0;
if (colWidthInBytes < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
}
buf += colWidthInBytes;
break;
}
case CalpontSystemCatalog::BIGINT:
{
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
@ -1659,134 +1572,43 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::TEXT:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::VARBINARY:
{
// For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes
Field* fieldPtr = table->field[colpos];
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr, "%c", ci.delimiter);
if (ci.columnTypes[colpos].colWidth < 256)
}
else
{
String value;
// We need to set table->read_set for a field first.
// This happens in ha_mcs_impl_start_bulk_insert().
fieldPtr->val_str(&value);
if (UNLIKELY(execplan::isBlobOrVarbinary(ci.columnTypes[colpos].colDataType)))
{
buf++;
const char* ptr = value.ptr();
for (uint32_t i = 0; i < value.length(); i++)
{
fprintf(ci.filePtr, "%02x", *(uint8_t*)ptr+i);
}
fprintf(ci.filePtr, "%c", ci.delimiter);
}
else
{
buf = buf + 2;
escape.assign(value.ptr(), value.length());
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
escape.c_str(), ci.enclosed_by, ci.delimiter);
}
}
else
{
// Maximum number of bytes allowed for a VARBINARY
// field is 65532, so the max length fits in 2 bytes.
// dataLength is length in bytes, not length in chars
uint16_t dataLength = 0;
if (ci.columnTypes[colpos].colWidth < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
tmpBuf++;
}
fprintf(ci.filePtr, "%c", ci.delimiter);
}
buf += ci.columnTypes[colpos].colWidth;
buf+= fieldPtr->pack_length();
break;
}
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
// MCOL-4005 Note that we don't handle nulls as a special
// case here as we do for other datatypes, the below works
// as expected for nulls.
// dataLength is length in bytes, not length in chars
uint32_t dataLength = 0;
uintptr_t* dataptr;
uchar* ucharptr;
uint32_t colWidthInBytes;
bool isBlob = ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB;
if (isBlob)
{
colWidthInBytes = ci.columnTypes[colpos].colWidth;
}
else
{
// For TEXT fields, MDB sets char_length to the maximum number that will fit in the number of bytes the
// defined TEXT length will fit in.
// Ex:
// TEXT(25) will have char_length() == 255;
// TEXT(200) for latin_1 will have char_length() = 255
// TEXT(200) for udf8mb4 will have char_length() = 65535
// the length 200 multiplied by mbmaxlen (4) is > 255, so it needs 2 bytes for length. MDB sets to max(uint_16t)
colWidthInBytes = table->field[colpos]->char_length();
}
if (colWidthInBytes < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else if (colWidthInBytes < 65536)
{
dataLength = *(uint16_t*) buf;
buf += 2;
}
else if (colWidthInBytes < 16777216)
{
dataLength = *(uint16_t*) buf;
dataLength |= ((int) buf[2]) << 16;
buf += 3;
}
else
{
dataLength = *(uint32_t*) buf;
buf += 4;
}
// buf contains pointer to blob, for example:
// (gdb) p (char*)*(uintptr_t*)buf
// $43 = 0x7f68500c58f8 "hello world"
dataptr = (uintptr_t*)buf;
ucharptr = (uchar*)*dataptr;
buf += sizeof(uintptr_t);
if (isBlob)
{
for (uint32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr, "%02x", *(uint8_t*)ucharptr);
ucharptr++;
}
fprintf(ci.filePtr, "%c", ci.delimiter);
}
else
{
escape.assign((char*)ucharptr, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
}
break;
}
default: // treat as int64
{
break;

View File

@ -3539,7 +3539,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
ci->fdt[0] = -1;
// now we can send all the data thru FIFO[1], writer of PARENT
}
// Set read_set used for bulk insertion of Fields inheriting
//from Field_blob|Field_varstring
bitmap_set_all(table->read_set);
#endif
}
else
@ -3600,6 +3602,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
ci->rc = rc;
ci->singleInsert = true;
bitmap_clear_all(table->read_set);
return;
}
@ -3609,6 +3612,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
if (stateFlags & SessionManagerServer::SS_SUSPENDED)
{
setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled.");
bitmap_clear_all(table->read_set);
return;
}
@ -3624,10 +3628,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
catch (IDBExcept& ie)
{
setError(thd, ER_INTERNAL_ERROR, ie.what());
// setError(thd, ER_UNKNOWN_TABLE, ie.what());
bitmap_clear_all(table->read_set);
}
catch (std::exception& ex)
{
bitmap_clear_all(table->read_set);
setError(thd, ER_INTERNAL_ERROR,
logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
}
@ -3641,6 +3646,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
{
// Clear read_set used for bulk insertion of Fields inheriting
//from Field_blob|Field_varstring
bitmap_clear_all(table->read_set);
THD* thd = current_thd;
std::string aTmpDir(startup::StartUp::tmpDir());
@ -3670,7 +3678,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
{
#ifdef _MSC_VER
if (thd->killed > 0)
{
errno = 0;