You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-04 04:42:30 +03:00
MCOL-4320/4364/4370 Fix multibyte processing for LDI/Insert...Select
For CHAR/VARCHAR/TEXT fields, the buffer size of a field represents the field size in bytes, which can be bigger than the field size in number of characters, for multi-byte character sets such as utf8, utf8mb4 etc. The buffer also contains a byte length prefix which can be up to 65532 bytes for a VARCHAR field, and much higher for a TEXT field (we process a maximum byte length for a TEXT field which fits in 4 bytes, which is 2^32 - 1 = 4GB!). There is also special processing for a TEXT field defined with a default length like so: CREATE TABLE cs1 (a TEXT CHARACTER SET utf8) Here, the byte length is a fixed 65535, irrespective of the character set used. This is different from a case such as: CREATE TABLE cs1 (a TEXT(65535) CHARACTER SET utf8), where the byte length for the field will be 65535*3.
This commit is contained in:
@ -887,6 +887,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
|||||||
|
|
||||||
case CalpontSystemCatalog::CHAR:
|
case CalpontSystemCatalog::CHAR:
|
||||||
{
|
{
|
||||||
|
Field* field = table->field[colpos];
|
||||||
|
|
||||||
|
uint32_t colWidthInBytes =
|
||||||
|
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
|
||||||
|
|
||||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||||
{
|
{
|
||||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||||
@ -896,10 +901,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
|||||||
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
|
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
|
||||||
{
|
{
|
||||||
// Pad to the full length of the field
|
// Pad to the full length of the field
|
||||||
if (ci.utf8)
|
escape.assign((char*)buf, colWidthInBytes);
|
||||||
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth * 3);
|
|
||||||
else
|
|
||||||
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth);
|
|
||||||
|
|
||||||
boost::replace_all(escape, "\\", "\\\\");
|
boost::replace_all(escape, "\\", "\\\\");
|
||||||
|
|
||||||
@ -922,86 +924,56 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ci.utf8)
|
buf += colWidthInBytes;
|
||||||
buf += (ci.columnTypes[colpos].colWidth * 3);
|
|
||||||
else
|
|
||||||
buf += ci.columnTypes[colpos].colWidth;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case CalpontSystemCatalog::VARCHAR:
|
case CalpontSystemCatalog::VARCHAR:
|
||||||
{
|
{
|
||||||
|
Field* field = table->field[colpos];
|
||||||
|
|
||||||
|
uint32_t colWidthInBytes =
|
||||||
|
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
|
||||||
|
|
||||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||||
{
|
{
|
||||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||||
|
|
||||||
if (!ci.utf8)
|
if (colWidthInBytes < 256)
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 256)
|
buf++;
|
||||||
{
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else //utf8
|
else
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 86)
|
buf = buf + 2 ;
|
||||||
{
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int dataLength = 0;
|
// Maximum number of bytes allowed for a VARCHAR
|
||||||
|
// field is 65532, so the max length fits in 2 bytes.
|
||||||
|
// dataLength is length in bytes, not length in chars
|
||||||
|
uint16_t dataLength = 0;
|
||||||
|
|
||||||
if (!ci.utf8)
|
if (colWidthInBytes < 256)
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 256)
|
dataLength = *(uint8_t*) buf;
|
||||||
{
|
buf++;
|
||||||
dataLength = *(uint8_t*) buf;
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
dataLength = *(uint16_t*) buf;
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
escape.assign((char*)buf, dataLength);
|
|
||||||
boost::replace_all(escape, "\\", "\\\\");
|
|
||||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
|
||||||
}
|
}
|
||||||
else //utf8
|
else
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 86)
|
dataLength = *(uint16_t*) buf;
|
||||||
{
|
buf = buf + 2 ;
|
||||||
dataLength = *(uint8_t*) buf;
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
dataLength = *(uint16_t*) buf;
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
|
|
||||||
escape.assign((char*)buf, dataLength);
|
|
||||||
boost::replace_all(escape, "\\", "\\\\");
|
|
||||||
|
|
||||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
escape.assign((char*)buf, dataLength);
|
||||||
|
boost::replace_all(escape, "\\", "\\\\");
|
||||||
|
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||||
}
|
}
|
||||||
if (ci.utf8)
|
|
||||||
buf += (ci.columnTypes[colpos].colWidth * 3);
|
buf += colWidthInBytes;
|
||||||
else
|
|
||||||
buf += ci.columnTypes[colpos].colWidth;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1695,92 +1667,50 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
|||||||
|
|
||||||
case CalpontSystemCatalog::VARBINARY:
|
case CalpontSystemCatalog::VARBINARY:
|
||||||
{
|
{
|
||||||
|
// For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes
|
||||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||||
{
|
{
|
||||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||||
|
|
||||||
if (!ci.utf8)
|
if (ci.columnTypes[colpos].colWidth < 256)
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 256)
|
buf++;
|
||||||
{
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
buf = buf + 2;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else //utf8
|
else
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 86)
|
buf = buf + 2;
|
||||||
{
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int dataLength = 0;
|
// Maximum number of bytes allowed for a VARBINARY
|
||||||
|
// field is 65532, so the max length fits in 2 bytes.
|
||||||
|
// dataLength is length in bytes, not length in chars
|
||||||
|
uint16_t dataLength = 0;
|
||||||
|
|
||||||
if (!ci.utf8)
|
if (ci.columnTypes[colpos].colWidth < 256)
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 256)
|
dataLength = *(uint8_t*) buf;
|
||||||
{
|
buf++;
|
||||||
dataLength = *(int8_t*) buf;
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
dataLength = *(int16_t*) buf;
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
|
|
||||||
const uchar* tmpBuf = buf;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < dataLength; i++)
|
|
||||||
{
|
|
||||||
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
|
|
||||||
tmpBuf++;
|
|
||||||
}
|
|
||||||
|
|
||||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
|
||||||
}
|
}
|
||||||
else //utf8
|
else
|
||||||
{
|
{
|
||||||
if (ci.columnTypes[colpos].colWidth < 86)
|
dataLength = *(uint16_t*) buf;
|
||||||
{
|
buf = buf + 2 ;
|
||||||
dataLength = *(int8_t*) buf;
|
|
||||||
buf++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
dataLength = *(uint16_t*) buf;
|
|
||||||
buf = buf + 2 ;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( dataLength > ci.columnTypes[colpos].colWidth)
|
|
||||||
dataLength = ci.columnTypes[colpos].colWidth;
|
|
||||||
|
|
||||||
const uchar* tmpBuf = buf;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < dataLength; i++)
|
|
||||||
{
|
|
||||||
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
|
|
||||||
tmpBuf++;
|
|
||||||
}
|
|
||||||
|
|
||||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const uchar* tmpBuf = buf;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < dataLength; i++)
|
||||||
|
{
|
||||||
|
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
|
||||||
|
tmpBuf++;
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ci.utf8)
|
buf += ci.columnTypes[colpos].colWidth;
|
||||||
buf += (ci.columnTypes[colpos].colWidth * 3);
|
|
||||||
else
|
|
||||||
buf += ci.columnTypes[colpos].colWidth;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1791,13 +1721,30 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
|||||||
// MCOL-4005 Note that we don't handle nulls as a special
|
// MCOL-4005 Note that we don't handle nulls as a special
|
||||||
// case here as we do for other datatypes, the below works
|
// case here as we do for other datatypes, the below works
|
||||||
// as expected for nulls.
|
// as expected for nulls.
|
||||||
|
// dataLength is length in bytes, not length in chars
|
||||||
uint32_t dataLength = 0;
|
uint32_t dataLength = 0;
|
||||||
uintptr_t* dataptr;
|
uintptr_t* dataptr;
|
||||||
uchar* ucharptr;
|
uchar* ucharptr;
|
||||||
uint colWidthInBytes = (ci.utf8 ?
|
|
||||||
ci.columnTypes[colpos].colWidth * 3: ci.columnTypes[colpos].colWidth);
|
|
||||||
|
|
||||||
if (colWidthInBytes < 256)
|
bool isBlob =
|
||||||
|
ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB;
|
||||||
|
|
||||||
|
Field* field = table->field[colpos];
|
||||||
|
|
||||||
|
uint32_t colWidthInBytes = isBlob ? ci.columnTypes[colpos].colWidth :
|
||||||
|
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
|
||||||
|
|
||||||
|
if (!isBlob && field->char_length() == 65535)
|
||||||
|
{
|
||||||
|
// Special case for TEXT field without default length,
|
||||||
|
// such as:
|
||||||
|
// CREATE TABLE mcol4364 (a TEXT);
|
||||||
|
// Here, char_length() represents the number of bytes,
|
||||||
|
// not number of characters.
|
||||||
|
dataLength = *(uint16_t*) buf;
|
||||||
|
buf += 2;
|
||||||
|
}
|
||||||
|
else if (colWidthInBytes < 256)
|
||||||
{
|
{
|
||||||
dataLength = *(uint8_t*) buf;
|
dataLength = *(uint8_t*) buf;
|
||||||
buf++;
|
buf++;
|
||||||
@ -1827,7 +1774,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
|||||||
ucharptr = (uchar*)*dataptr;
|
ucharptr = (uchar*)*dataptr;
|
||||||
buf += sizeof(uintptr_t);
|
buf += sizeof(uintptr_t);
|
||||||
|
|
||||||
if (ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB)
|
if (isBlob)
|
||||||
{
|
{
|
||||||
for (uint32_t i = 0; i < dataLength; i++)
|
for (uint32_t i = 0; i < dataLength; i++)
|
||||||
{
|
{
|
||||||
|
@ -3208,7 +3208,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
|||||||
tableName.schema = table->s->db.str;
|
tableName.schema = table->s->db.str;
|
||||||
tableName.table = table->s->table_name.str;
|
tableName.table = table->s->table_name.str;
|
||||||
ci->useXbit = false;
|
ci->useXbit = false;
|
||||||
ci->utf8 = false;
|
|
||||||
CalpontSystemCatalog::RIDList colrids;
|
CalpontSystemCatalog::RIDList colrids;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -3253,11 +3252,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
|||||||
else
|
else
|
||||||
ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
|
ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
|
||||||
|
|
||||||
if ((strncmp(table->s->table_charset->comment, "UTF-8", 5) == 0) || (strncmp(table->s->table_charset->comment, "utf-8", 5) == 0))
|
|
||||||
{
|
|
||||||
ci->utf8 = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
//Log the statement to debug.log
|
//Log the statement to debug.log
|
||||||
{
|
{
|
||||||
ostringstream oss;
|
ostringstream oss;
|
||||||
|
@ -259,7 +259,6 @@ struct cal_connection_info
|
|||||||
filePtr(0),
|
filePtr(0),
|
||||||
headerLength(0),
|
headerLength(0),
|
||||||
useXbit(false),
|
useXbit(false),
|
||||||
utf8(false),
|
|
||||||
useCpimport(1),
|
useCpimport(1),
|
||||||
delimiter('\7'),
|
delimiter('\7'),
|
||||||
affectedRows(0)
|
affectedRows(0)
|
||||||
@ -327,7 +326,6 @@ struct cal_connection_info
|
|||||||
FILE* filePtr;
|
FILE* filePtr;
|
||||||
uint8_t headerLength;
|
uint8_t headerLength;
|
||||||
bool useXbit;
|
bool useXbit;
|
||||||
bool utf8;
|
|
||||||
uint8_t useCpimport;
|
uint8_t useCpimport;
|
||||||
char delimiter;
|
char delimiter;
|
||||||
char enclosed_by;
|
char enclosed_by;
|
||||||
|
Reference in New Issue
Block a user