1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-04 04:42:30 +03:00

MCOL-4320/4364/4370 Fix multibyte processing for LDI/Insert...Select

For CHAR/VARCHAR/TEXT fields, the buffer size of a field represents
the field size in bytes, which can be bigger than the field size in
number of characters, for multi-byte character sets such as utf8,
utf8mb4 etc. The buffer also contains a byte length prefix which can be
up to 65532 bytes for a VARCHAR field, and much higher for a TEXT
field (we process a maximum byte length for a TEXT field which fits in
4 bytes, which is 2^32 - 1 = 4GB!).

There is also special processing for a TEXT field defined with a default
length like so:
  CREATE TABLE cs1 (a TEXT CHARACTER SET utf8)
Here, the byte length is a fixed 65535, irrespective of the character
set used. This is different from a case such as:
  CREATE TABLE cs1 (a TEXT(65535) CHARACTER SET utf8), where the byte length
for the field will be 65535*3.
This commit is contained in:
Gagan Goel
2020-10-26 17:51:24 +00:00
parent 26131a1b43
commit 13264feb7d
3 changed files with 80 additions and 141 deletions

View File

@ -887,6 +887,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
case CalpontSystemCatalog::CHAR:
{
Field* field = table->field[colpos];
uint32_t colWidthInBytes =
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr, "%c", ci.delimiter);
@ -896,10 +901,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
{
// Pad to the full length of the field
if (ci.utf8)
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth * 3);
else
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth);
escape.assign((char*)buf, colWidthInBytes);
boost::replace_all(escape, "\\", "\\\\");
@ -922,23 +924,23 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
}
}
if (ci.utf8)
buf += (ci.columnTypes[colpos].colWidth * 3);
else
buf += ci.columnTypes[colpos].colWidth;
buf += colWidthInBytes;
break;
}
case CalpontSystemCatalog::VARCHAR:
{
Field* field = table->field[colpos];
uint32_t colWidthInBytes =
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr, "%c", ci.delimiter);
if (!ci.utf8)
{
if (ci.columnTypes[colpos].colWidth < 256)
if (colWidthInBytes < 256)
{
buf++;
}
@ -947,41 +949,14 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
buf = buf + 2 ;
}
}
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{
buf++;
}
else
{
buf = buf + 2 ;
}
}
}
else
{
int dataLength = 0;
// Maximum number of bytes allowed for a VARCHAR
// field is 65532, so the max length fits in 2 bytes.
// dataLength is length in bytes, not length in chars
uint16_t dataLength = 0;
if (!ci.utf8)
{
if (ci.columnTypes[colpos].colWidth < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
}
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
if (colWidthInBytes < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
@ -994,14 +969,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
}
}
if (ci.utf8)
buf += (ci.columnTypes[colpos].colWidth * 3);
else
buf += ci.columnTypes[colpos].colWidth;
buf += colWidthInBytes;
break;
}
@ -1695,12 +1667,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
case CalpontSystemCatalog::VARBINARY:
{
// For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr, "%c", ci.delimiter);
if (!ci.utf8)
{
if (ci.columnTypes[colpos].colWidth < 256)
{
buf++;
@ -1710,50 +1681,16 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
buf = buf + 2;
}
}
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{
buf++;
}
else
{
buf = buf + 2 ;
}
}
}
else
{
int dataLength = 0;
// Maximum number of bytes allowed for a VARBINARY
// field is 65532, so the max length fits in 2 bytes.
// dataLength is length in bytes, not length in chars
uint16_t dataLength = 0;
if (!ci.utf8)
{
if (ci.columnTypes[colpos].colWidth < 256)
{
dataLength = *(int8_t*) buf;
buf++;
}
else
{
dataLength = *(int16_t*) buf;
buf = buf + 2 ;
}
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
tmpBuf++;
}
fprintf(ci.filePtr, "%c", ci.delimiter);
}
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{
dataLength = *(int8_t*) buf;
dataLength = *(uint8_t*) buf;
buf++;
}
else
@ -1762,9 +1699,6 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
buf = buf + 2 ;
}
if ( dataLength > ci.columnTypes[colpos].colWidth)
dataLength = ci.columnTypes[colpos].colWidth;
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
@ -1775,11 +1709,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
fprintf(ci.filePtr, "%c", ci.delimiter);
}
}
if (ci.utf8)
buf += (ci.columnTypes[colpos].colWidth * 3);
else
buf += ci.columnTypes[colpos].colWidth;
break;
@ -1791,13 +1721,30 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
// MCOL-4005 Note that we don't handle nulls as a special
// case here as we do for other datatypes, the below works
// as expected for nulls.
// dataLength is length in bytes, not length in chars
uint32_t dataLength = 0;
uintptr_t* dataptr;
uchar* ucharptr;
uint colWidthInBytes = (ci.utf8 ?
ci.columnTypes[colpos].colWidth * 3: ci.columnTypes[colpos].colWidth);
if (colWidthInBytes < 256)
bool isBlob =
ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB;
Field* field = table->field[colpos];
uint32_t colWidthInBytes = isBlob ? ci.columnTypes[colpos].colWidth :
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
if (!isBlob && field->char_length() == 65535)
{
// Special case for TEXT field without default length,
// such as:
// CREATE TABLE mcol4364 (a TEXT);
// Here, char_length() represents the number of bytes,
// not number of characters.
dataLength = *(uint16_t*) buf;
buf += 2;
}
else if (colWidthInBytes < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
@ -1827,7 +1774,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
ucharptr = (uchar*)*dataptr;
buf += sizeof(uintptr_t);
if (ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB)
if (isBlob)
{
for (uint32_t i = 0; i < dataLength; i++)
{

View File

@ -3208,7 +3208,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
tableName.schema = table->s->db.str;
tableName.table = table->s->table_name.str;
ci->useXbit = false;
ci->utf8 = false;
CalpontSystemCatalog::RIDList colrids;
try
@ -3253,11 +3252,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
else
ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
if ((strncmp(table->s->table_charset->comment, "UTF-8", 5) == 0) || (strncmp(table->s->table_charset->comment, "utf-8", 5) == 0))
{
ci->utf8 = true;
}
//Log the statement to debug.log
{
ostringstream oss;

View File

@ -259,7 +259,6 @@ struct cal_connection_info
filePtr(0),
headerLength(0),
useXbit(false),
utf8(false),
useCpimport(1),
delimiter('\7'),
affectedRows(0)
@ -327,7 +326,6 @@ struct cal_connection_info
FILE* filePtr;
uint8_t headerLength;
bool useXbit;
bool utf8;
uint8_t useCpimport;
char delimiter;
char enclosed_by;