You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-04 04:42:30 +03:00
MCOL-4320/4364/4370 Fix multibyte processing for LDI/Insert...Select
For CHAR/VARCHAR/TEXT fields, the buffer size of a field represents the field size in bytes, which can be bigger than the field size in number of characters, for multi-byte character sets such as utf8, utf8mb4 etc. The buffer also contains a byte length prefix which can be up to 65532 bytes for a VARCHAR field, and much higher for a TEXT field (we process a maximum byte length for a TEXT field which fits in 4 bytes, which is 2^32 - 1 = 4GB!). There is also special processing for a TEXT field defined with a default length like so: CREATE TABLE cs1 (a TEXT CHARACTER SET utf8) Here, the byte length is a fixed 65535, irrespective of the character set used. This is different from a case such as: CREATE TABLE cs1 (a TEXT(65535) CHARACTER SET utf8), where the byte length for the field will be 65535*3.
This commit is contained in:
@ -887,6 +887,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
{
|
||||
Field* field = table->field[colpos];
|
||||
|
||||
uint32_t colWidthInBytes =
|
||||
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
|
||||
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
@ -896,10 +901,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
|
||||
{
|
||||
// Pad to the full length of the field
|
||||
if (ci.utf8)
|
||||
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth * 3);
|
||||
else
|
||||
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth);
|
||||
escape.assign((char*)buf, colWidthInBytes);
|
||||
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
|
||||
@ -922,23 +924,23 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
}
|
||||
}
|
||||
|
||||
if (ci.utf8)
|
||||
buf += (ci.columnTypes[colpos].colWidth * 3);
|
||||
else
|
||||
buf += ci.columnTypes[colpos].colWidth;
|
||||
buf += colWidthInBytes;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
{
|
||||
Field* field = table->field[colpos];
|
||||
|
||||
uint32_t colWidthInBytes =
|
||||
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
|
||||
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
|
||||
if (!ci.utf8)
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 256)
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
buf++;
|
||||
}
|
||||
@ -947,41 +949,14 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
}
|
||||
else //utf8
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 86)
|
||||
{
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int dataLength = 0;
|
||||
// Maximum number of bytes allowed for a VARCHAR
|
||||
// field is 65532, so the max length fits in 2 bytes.
|
||||
// dataLength is length in bytes, not length in chars
|
||||
uint16_t dataLength = 0;
|
||||
|
||||
if (!ci.utf8)
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
escape.assign((char*)buf, dataLength);
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
else //utf8
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 86)
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
@ -994,14 +969,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
|
||||
escape.assign((char*)buf, dataLength);
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
}
|
||||
if (ci.utf8)
|
||||
buf += (ci.columnTypes[colpos].colWidth * 3);
|
||||
else
|
||||
buf += ci.columnTypes[colpos].colWidth;
|
||||
|
||||
buf += colWidthInBytes;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@ -1695,12 +1667,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
|
||||
case CalpontSystemCatalog::VARBINARY:
|
||||
{
|
||||
// For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
|
||||
if (!ci.utf8)
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 256)
|
||||
{
|
||||
buf++;
|
||||
@ -1710,50 +1681,16 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
buf = buf + 2;
|
||||
}
|
||||
}
|
||||
else //utf8
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 86)
|
||||
{
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int dataLength = 0;
|
||||
// Maximum number of bytes allowed for a VARBINARY
|
||||
// field is 65532, so the max length fits in 2 bytes.
|
||||
// dataLength is length in bytes, not length in chars
|
||||
uint16_t dataLength = 0;
|
||||
|
||||
if (!ci.utf8)
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 256)
|
||||
{
|
||||
dataLength = *(int8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(int16_t*) buf;
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
|
||||
const uchar* tmpBuf = buf;
|
||||
|
||||
for (int32_t i = 0; i < dataLength; i++)
|
||||
{
|
||||
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
|
||||
tmpBuf++;
|
||||
}
|
||||
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
}
|
||||
else //utf8
|
||||
{
|
||||
if (ci.columnTypes[colpos].colWidth < 86)
|
||||
{
|
||||
dataLength = *(int8_t*) buf;
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
@ -1762,9 +1699,6 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
|
||||
if ( dataLength > ci.columnTypes[colpos].colWidth)
|
||||
dataLength = ci.columnTypes[colpos].colWidth;
|
||||
|
||||
const uchar* tmpBuf = buf;
|
||||
|
||||
for (int32_t i = 0; i < dataLength; i++)
|
||||
@ -1775,11 +1709,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
}
|
||||
}
|
||||
|
||||
if (ci.utf8)
|
||||
buf += (ci.columnTypes[colpos].colWidth * 3);
|
||||
else
|
||||
buf += ci.columnTypes[colpos].colWidth;
|
||||
|
||||
break;
|
||||
@ -1791,13 +1721,30 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
// MCOL-4005 Note that we don't handle nulls as a special
|
||||
// case here as we do for other datatypes, the below works
|
||||
// as expected for nulls.
|
||||
// dataLength is length in bytes, not length in chars
|
||||
uint32_t dataLength = 0;
|
||||
uintptr_t* dataptr;
|
||||
uchar* ucharptr;
|
||||
uint colWidthInBytes = (ci.utf8 ?
|
||||
ci.columnTypes[colpos].colWidth * 3: ci.columnTypes[colpos].colWidth);
|
||||
|
||||
if (colWidthInBytes < 256)
|
||||
bool isBlob =
|
||||
ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB;
|
||||
|
||||
Field* field = table->field[colpos];
|
||||
|
||||
uint32_t colWidthInBytes = isBlob ? ci.columnTypes[colpos].colWidth :
|
||||
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
|
||||
|
||||
if (!isBlob && field->char_length() == 65535)
|
||||
{
|
||||
// Special case for TEXT field without default length,
|
||||
// such as:
|
||||
// CREATE TABLE mcol4364 (a TEXT);
|
||||
// Here, char_length() represents the number of bytes,
|
||||
// not number of characters.
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf += 2;
|
||||
}
|
||||
else if (colWidthInBytes < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
@ -1827,7 +1774,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
ucharptr = (uchar*)*dataptr;
|
||||
buf += sizeof(uintptr_t);
|
||||
|
||||
if (ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB)
|
||||
if (isBlob)
|
||||
{
|
||||
for (uint32_t i = 0; i < dataLength; i++)
|
||||
{
|
||||
|
@ -3208,7 +3208,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
tableName.schema = table->s->db.str;
|
||||
tableName.table = table->s->table_name.str;
|
||||
ci->useXbit = false;
|
||||
ci->utf8 = false;
|
||||
CalpontSystemCatalog::RIDList colrids;
|
||||
|
||||
try
|
||||
@ -3253,11 +3252,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
else
|
||||
ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
|
||||
|
||||
if ((strncmp(table->s->table_charset->comment, "UTF-8", 5) == 0) || (strncmp(table->s->table_charset->comment, "utf-8", 5) == 0))
|
||||
{
|
||||
ci->utf8 = true;
|
||||
}
|
||||
|
||||
//Log the statement to debug.log
|
||||
{
|
||||
ostringstream oss;
|
||||
|
@ -259,7 +259,6 @@ struct cal_connection_info
|
||||
filePtr(0),
|
||||
headerLength(0),
|
||||
useXbit(false),
|
||||
utf8(false),
|
||||
useCpimport(1),
|
||||
delimiter('\7'),
|
||||
affectedRows(0)
|
||||
@ -327,7 +326,6 @@ struct cal_connection_info
|
||||
FILE* filePtr;
|
||||
uint8_t headerLength;
|
||||
bool useXbit;
|
||||
bool utf8;
|
||||
uint8_t useCpimport;
|
||||
char delimiter;
|
||||
char enclosed_by;
|
||||
|
Reference in New Issue
Block a user