1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-04 04:42:30 +03:00

MCOL-4320/4364/4370 Fix multibyte processing for LDI/Insert...Select

For CHAR/VARCHAR/TEXT fields, the buffer size of a field represents
the field size in bytes, which can be bigger than the field size in
number of characters, for multi-byte character sets such as utf8,
utf8mb4 etc. The buffer also contains a byte length prefix which can be
up to 65532 bytes for a VARCHAR field, and much higher for a TEXT
field (we process a maximum byte length for a TEXT field which fits in
4 bytes, which is 2^32 - 1 = 4GB!).

There is also special processing for a TEXT field defined with a default
length like so:
  CREATE TABLE cs1 (a TEXT CHARACTER SET utf8)
Here, the byte length is a fixed 65535, irrespective of the character
set used. This is different from a case such as:
  CREATE TABLE cs1 (a TEXT(65535) CHARACTER SET utf8), where the byte length
for the field will be 65535*3.
This commit is contained in:
Gagan Goel
2020-10-26 17:51:24 +00:00
parent 26131a1b43
commit 13264feb7d
3 changed files with 80 additions and 141 deletions

View File

@ -887,6 +887,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::CHAR:
{ {
Field* field = table->field[colpos];
uint32_t colWidthInBytes =
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{ {
fprintf(ci.filePtr, "%c", ci.delimiter); fprintf(ci.filePtr, "%c", ci.delimiter);
@ -896,10 +901,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH) if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
{ {
// Pad to the full length of the field // Pad to the full length of the field
if (ci.utf8) escape.assign((char*)buf, colWidthInBytes);
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth * 3);
else
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth);
boost::replace_all(escape, "\\", "\\\\"); boost::replace_all(escape, "\\", "\\\\");
@ -922,23 +924,23 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
} }
} }
if (ci.utf8) buf += colWidthInBytes;
buf += (ci.columnTypes[colpos].colWidth * 3);
else
buf += ci.columnTypes[colpos].colWidth;
break; break;
} }
case CalpontSystemCatalog::VARCHAR: case CalpontSystemCatalog::VARCHAR:
{ {
Field* field = table->field[colpos];
uint32_t colWidthInBytes =
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{ {
fprintf(ci.filePtr, "%c", ci.delimiter); fprintf(ci.filePtr, "%c", ci.delimiter);
if (!ci.utf8) if (colWidthInBytes < 256)
{
if (ci.columnTypes[colpos].colWidth < 256)
{ {
buf++; buf++;
} }
@ -947,41 +949,14 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
buf = buf + 2 ; buf = buf + 2 ;
} }
} }
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{
buf++;
}
else else
{ {
buf = buf + 2 ; // Maximum number of bytes allowed for a VARCHAR
} // field is 65532, so the max length fits in 2 bytes.
} // dataLength is length in bytes, not length in chars
} uint16_t dataLength = 0;
else
{
int dataLength = 0;
if (!ci.utf8) if (colWidthInBytes < 256)
{
if (ci.columnTypes[colpos].colWidth < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
}
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{ {
dataLength = *(uint8_t*) buf; dataLength = *(uint8_t*) buf;
buf++; buf++;
@ -994,14 +969,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
escape.assign((char*)buf, dataLength); escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\"); boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter); fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
} }
}
if (ci.utf8) buf += colWidthInBytes;
buf += (ci.columnTypes[colpos].colWidth * 3);
else
buf += ci.columnTypes[colpos].colWidth;
break; break;
} }
@ -1695,12 +1667,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::VARBINARY:
{ {
// For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT)) if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{ {
fprintf(ci.filePtr, "%c", ci.delimiter); fprintf(ci.filePtr, "%c", ci.delimiter);
if (!ci.utf8)
{
if (ci.columnTypes[colpos].colWidth < 256) if (ci.columnTypes[colpos].colWidth < 256)
{ {
buf++; buf++;
@ -1710,50 +1681,16 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
buf = buf + 2; buf = buf + 2;
} }
} }
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{
buf++;
}
else else
{ {
buf = buf + 2 ; // Maximum number of bytes allowed for a VARBINARY
} // field is 65532, so the max length fits in 2 bytes.
} // dataLength is length in bytes, not length in chars
} uint16_t dataLength = 0;
else
{
int dataLength = 0;
if (!ci.utf8)
{
if (ci.columnTypes[colpos].colWidth < 256) if (ci.columnTypes[colpos].colWidth < 256)
{ {
dataLength = *(int8_t*) buf; dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(int16_t*) buf;
buf = buf + 2 ;
}
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
tmpBuf++;
}
fprintf(ci.filePtr, "%c", ci.delimiter);
}
else //utf8
{
if (ci.columnTypes[colpos].colWidth < 86)
{
dataLength = *(int8_t*) buf;
buf++; buf++;
} }
else else
@ -1762,9 +1699,6 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
buf = buf + 2 ; buf = buf + 2 ;
} }
if ( dataLength > ci.columnTypes[colpos].colWidth)
dataLength = ci.columnTypes[colpos].colWidth;
const uchar* tmpBuf = buf; const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++) for (int32_t i = 0; i < dataLength; i++)
@ -1775,11 +1709,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
fprintf(ci.filePtr, "%c", ci.delimiter); fprintf(ci.filePtr, "%c", ci.delimiter);
} }
}
if (ci.utf8)
buf += (ci.columnTypes[colpos].colWidth * 3);
else
buf += ci.columnTypes[colpos].colWidth; buf += ci.columnTypes[colpos].colWidth;
break; break;
@ -1791,13 +1721,30 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
// MCOL-4005 Note that we don't handle nulls as a special // MCOL-4005 Note that we don't handle nulls as a special
// case here as we do for other datatypes, the below works // case here as we do for other datatypes, the below works
// as expected for nulls. // as expected for nulls.
// dataLength is length in bytes, not length in chars
uint32_t dataLength = 0; uint32_t dataLength = 0;
uintptr_t* dataptr; uintptr_t* dataptr;
uchar* ucharptr; uchar* ucharptr;
uint colWidthInBytes = (ci.utf8 ?
ci.columnTypes[colpos].colWidth * 3: ci.columnTypes[colpos].colWidth);
if (colWidthInBytes < 256) bool isBlob =
ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB;
Field* field = table->field[colpos];
uint32_t colWidthInBytes = isBlob ? ci.columnTypes[colpos].colWidth :
ci.columnTypes[colpos].colWidth * field->charset()->mbmaxlen;
if (!isBlob && field->char_length() == 65535)
{
// Special case for TEXT field without default length,
// such as:
// CREATE TABLE mcol4364 (a TEXT);
// Here, char_length() represents the number of bytes,
// not number of characters.
dataLength = *(uint16_t*) buf;
buf += 2;
}
else if (colWidthInBytes < 256)
{ {
dataLength = *(uint8_t*) buf; dataLength = *(uint8_t*) buf;
buf++; buf++;
@ -1827,7 +1774,7 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
ucharptr = (uchar*)*dataptr; ucharptr = (uchar*)*dataptr;
buf += sizeof(uintptr_t); buf += sizeof(uintptr_t);
if (ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB) if (isBlob)
{ {
for (uint32_t i = 0; i < dataLength; i++) for (uint32_t i = 0; i < dataLength; i++)
{ {

View File

@ -3208,7 +3208,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
tableName.schema = table->s->db.str; tableName.schema = table->s->db.str;
tableName.table = table->s->table_name.str; tableName.table = table->s->table_name.str;
ci->useXbit = false; ci->useXbit = false;
ci->utf8 = false;
CalpontSystemCatalog::RIDList colrids; CalpontSystemCatalog::RIDList colrids;
try try
@ -3253,11 +3252,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
else else
ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8; ci->headerLength = (1 + colrids.size() + 7 - numberNotNull) / 8;
if ((strncmp(table->s->table_charset->comment, "UTF-8", 5) == 0) || (strncmp(table->s->table_charset->comment, "utf-8", 5) == 0))
{
ci->utf8 = true;
}
//Log the statement to debug.log //Log the statement to debug.log
{ {
ostringstream oss; ostringstream oss;

View File

@ -259,7 +259,6 @@ struct cal_connection_info
filePtr(0), filePtr(0),
headerLength(0), headerLength(0),
useXbit(false), useXbit(false),
utf8(false),
useCpimport(1), useCpimport(1),
delimiter('\7'), delimiter('\7'),
affectedRows(0) affectedRows(0)
@ -327,7 +326,6 @@ struct cal_connection_info
FILE* filePtr; FILE* filePtr;
uint8_t headerLength; uint8_t headerLength;
bool useXbit; bool useXbit;
bool utf8;
uint8_t useCpimport; uint8_t useCpimport;
char delimiter; char delimiter;
char enclosed_by; char enclosed_by;