1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge for 13264feb7 that is the fix for multiple LDI issues described in MCOL-4320/4364/4370

This commit is contained in:
Roman Nozdrin
2020-11-17 11:38:25 +00:00
parent 58495d0d2f
commit 31e0909552
3 changed files with 70 additions and 149 deletions

View File

@ -191,8 +191,11 @@ class WriteBatchFieldMariaDB: public WriteBatchField
public:
Field *m_field;
const CalpontSystemCatalog::ColType &m_type;
WriteBatchFieldMariaDB(Field *field, const CalpontSystemCatalog::ColType type)
:m_field(field), m_type(type)
uint32_t m_mbmaxlen;
WriteBatchFieldMariaDB(Field *field,
const CalpontSystemCatalog::ColType type,
uint32_t mbmaxlen)
:m_field(field), m_type(type), m_mbmaxlen(mbmaxlen)
{ }
size_t ColWriteBatchDate(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
{
@ -328,6 +331,7 @@ public:
size_t ColWriteBatchChar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
{
uint32_t colWidthInBytes = m_type.colWidth * m_mbmaxlen;
if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr(), "%c", ci.delimiter());
@ -338,10 +342,7 @@ public:
{
std::string escape;
// Pad to the full length of the field
if (ci.utf8())
escape.assign((char*)buf, m_type.colWidth * 3);
else
escape.assign((char*)buf, m_type.colWidth);
escape.assign((char*)buf, colWidthInBytes);
boost::replace_all(escape, "\\", "\\\\");
@ -368,93 +369,48 @@ public:
}
}
if (ci.utf8())
return m_type.colWidth * 3;
else
return m_type.colWidth;
return colWidthInBytes;
}
size_t ColWriteBatchVarchar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
{
const uchar *buf0= buf;
uint32_t colWidthInBytes = m_type.colWidth * m_mbmaxlen;
if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
fprintf(ci.filePtr(), "%c", ci.delimiter());
if (!ci.utf8())
if (colWidthInBytes < 256)
{
if (m_type.colWidth < 256)
{
buf++;
}
else
{
buf = buf + 2 ;
}
buf++;
}
else //utf8
else
{
if (m_type.colWidth < 86)
{
buf++;
}
else
{
buf = buf + 2 ;
}
buf = buf + 2 ;
}
}
else
{
int dataLength = 0;
if (!ci.utf8())
if (colWidthInBytes < 256)
{
if (m_type.colWidth < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
std::string escape;
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr(), "%c%.*s%c%c",
ci.enclosed_by(),
(int)escape.length(), escape.c_str(),
ci.enclosed_by(), ci.delimiter());
}
else //utf8
else
{
if (m_type.colWidth < 86)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
std::string escape;
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr(), "%c%.*s%c%c",
ci.enclosed_by(),
(int)escape.length(), escape.c_str(),
ci.enclosed_by(), ci.delimiter());
}
std::string escape;
escape.assign((char*)buf, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr(), "%c%.*s%c%c",
ci.enclosed_by(),
(int)escape.length(), escape.c_str(),
ci.enclosed_by(), ci.delimiter());
}
if (ci.utf8())
buf += (m_type.colWidth * 3);
else
buf += m_type.colWidth;
buf += colWidthInBytes;
return buf - buf0;
}
@ -1125,89 +1081,42 @@ public:
{
fprintf(ci.filePtr(), "%c", ci.delimiter());
if (!ci.utf8())
if (m_type.colWidth < 256)
{
if (m_type.colWidth < 256)
{
buf++;
}
else
{
buf = buf + 2;
}
buf++;
}
else //utf8
else
{
if (m_type.colWidth < 86)
{
buf++;
}
else
{
buf = buf + 2 ;
}
buf = buf + 2;
}
}
else
{
int dataLength = 0;
uint16_t dataLength = 0;
if (!ci.utf8())
if (m_type.colWidth < 256)
{
if (m_type.colWidth < 256)
{
dataLength = *(int8_t*) buf;
buf++;
}
else
{
dataLength = *(int16_t*) buf;
buf = buf + 2 ;
}
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr(), "%02x", *(uint8_t*)tmpBuf);
tmpBuf++;
}
fprintf(ci.filePtr(), "%c", ci.delimiter());
dataLength = *(int8_t*) buf;
buf++;
}
else //utf8
else
{
if (m_type.colWidth < 86)
{
dataLength = *(int8_t*) buf;
buf++;
}
else
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
}
if ( dataLength > m_type.colWidth)
dataLength = m_type.colWidth;
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr(), "%02x", *(uint8_t*)tmpBuf);
tmpBuf++;
}
fprintf(ci.filePtr(), "%c", ci.delimiter());
dataLength = *(int16_t*) buf;
buf = buf + 2 ;
}
const uchar* tmpBuf = buf;
for (int32_t i = 0; i < dataLength; i++)
{
fprintf(ci.filePtr(), "%02x", *(uint8_t*)tmpBuf);
tmpBuf++;
}
fprintf(ci.filePtr(), "%c", ci.delimiter());
}
if (ci.utf8())
buf += (m_type.colWidth * 3); // QQ: why? It is varbinary!
else
buf += m_type.colWidth;
return buf - buf0;
}
@ -1221,10 +1130,20 @@ public:
uint32_t dataLength = 0;
uintptr_t* dataptr;
uchar* ucharptr;
uint colWidthInBytes = (ci.utf8() ?
m_type.colWidth * 3: m_type.colWidth);
bool isBlob = m_type.colDataType == CalpontSystemCatalog::BLOB;
uint colWidthInBytes = isBlob ? m_type.colWidth : m_type.colWidth * m_mbmaxlen;
if (colWidthInBytes < 256)
if (!isBlob && m_field->char_length() == 65535)
{
// Special case for TEXT field without default length,
// such as:
// CREATE TABLE mcol4364 (a TEXT);
// Here, char_length() represents the number of bytes,
// not number of characters.
dataLength = *(uint16_t*) buf;
buf += 2;
}
else if (colWidthInBytes < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
@ -1254,7 +1173,7 @@ public:
ucharptr = (uchar*)*dataptr;
buf += sizeof(uintptr_t);
if (m_type.colDataType == CalpontSystemCatalog::BLOB)
if (isBlob)
{
for (uint32_t i = 0; i < dataLength; i++)
{

View File

@ -736,9 +736,15 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
const datatypes::TypeHandler *h= colType.typeHandler();
if (h) // QQ: error reporting
{
datatypes::ColBatchWriter writer(ci.filePtr, ci.delimiter,
ci.enclosed_by, ci.utf8);
datatypes::WriteBatchFieldMariaDB field(table->field[colpos], colType);
datatypes::ColBatchWriter writer(ci.filePtr,
ci.delimiter,
ci.enclosed_by);
Field* fieldPtr= table->field[colpos];
uint32_t mbmaxlen = (fieldPtr->charset() && fieldPtr->charset()->mbmaxlen)
? fieldPtr->charset()->mbmaxlen : 0;
datatypes::WriteBatchFieldMariaDB field(fieldPtr,
colType,
mbmaxlen);
idbassert(table == table->field[colpos]->table);
buf+= h->ColWriteBatch(&field, buf, nullVal, writer);
}