You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-4496 Upmerged the commit 7fa5ca3a6
to generalise CHAR|VARCHAR|TEXT|BLOB
bulk insertion processing and fix the original issue introduced by MCOL-2000
This commit is contained in:
@ -185,7 +185,6 @@ public:
|
||||
|
||||
|
||||
/*******************************************************************************/
|
||||
|
||||
class WriteBatchFieldMariaDB: public WriteBatchField
|
||||
{
|
||||
// Maximum number of decimal digits that can be represented in 4 bytes
|
||||
@ -343,89 +342,76 @@ public:
|
||||
return m_field->pack_length();
|
||||
}
|
||||
|
||||
size_t ColWriteBatchChar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
|
||||
static inline void ColWriteBatchTextStringPrintout(std::string& escape,
|
||||
const ColBatchWriter &ci)
|
||||
{
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr(), "%c%.*s%c%c", ci.enclosed_by(), (int)escape.length(),
|
||||
escape.c_str(), ci.enclosed_by(), ci.delimiter());
|
||||
}
|
||||
|
||||
static void ColWriteBatchTextString(const String &value,
|
||||
const ColBatchWriter &ci,
|
||||
const size_t colWidthInBytes)
|
||||
{
|
||||
std::string escape;
|
||||
escape.assign(value.ptr(), value.length());
|
||||
ColWriteBatchTextStringPrintout(escape, ci);
|
||||
}
|
||||
|
||||
static void ColWriteBatchPaddedTextString(const String &value,
|
||||
const ColBatchWriter &ci,
|
||||
const size_t colWidthInBytes)
|
||||
{
|
||||
std::string escape;
|
||||
escape.assign(value.ptr(), colWidthInBytes);
|
||||
ColWriteBatchTextStringPrintout(escape, ci);
|
||||
}
|
||||
|
||||
static void ColWriteBatchBlobString(const String &value,
|
||||
const ColBatchWriter &ci,
|
||||
const size_t colWidthInBytes)
|
||||
{
|
||||
const char* ptr = value.ptr();
|
||||
for (uint32_t i = 0; i < value.length(); i++)
|
||||
{
|
||||
fprintf(ci.filePtr(), "%02x", *(uint8_t*)ptr+i);
|
||||
}
|
||||
fprintf(ci.filePtr(), "%c", ci.delimiter());
|
||||
}
|
||||
|
||||
size_t ColWriteBatchString(const uchar *buf,
|
||||
bool nullVal,
|
||||
ColBatchWriter &ci,
|
||||
void (*printFuncPtr)(const String&,
|
||||
const ColBatchWriter &,
|
||||
const size_t colWidthInBytes)) const
|
||||
{
|
||||
uint32_t colWidthInBytes = m_type.colWidth * m_mbmaxlen;
|
||||
if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr(), "%c", ci.delimiter());
|
||||
}
|
||||
else
|
||||
{
|
||||
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
|
||||
{
|
||||
std::string escape;
|
||||
// Pad to the full length of the field
|
||||
escape.assign((char*)buf, colWidthInBytes);
|
||||
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
|
||||
fprintf(ci.filePtr(), "%c%.*s%c%c",
|
||||
ci.enclosed_by(),
|
||||
(int)escape.length(), escape.c_str(),
|
||||
ci.enclosed_by(), ci.delimiter());
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string escape;
|
||||
// Get the actual data length
|
||||
bitmap_set_bit(m_field->table->read_set, m_field->field_index);
|
||||
String attribute;
|
||||
m_field->val_str(&attribute);
|
||||
|
||||
escape.assign((char*)buf, attribute.length());
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
|
||||
fprintf(ci.filePtr(), "%c%.*s%c%c",
|
||||
ci.enclosed_by(),
|
||||
(int)escape.length(), escape.c_str(),
|
||||
ci.enclosed_by(), ci.delimiter());
|
||||
}
|
||||
String value;
|
||||
// We need to set table->read_set for a field first.
|
||||
// This happens in ha_mcs_impl_start_bulk_insert().
|
||||
m_field->val_str(&value);
|
||||
(*printFuncPtr)(value, ci, m_field->pack_length());
|
||||
}
|
||||
return m_field->pack_length();
|
||||
}
|
||||
|
||||
return colWidthInBytes;
|
||||
size_t ColWriteBatchChar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
|
||||
{
|
||||
return (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH) ?
|
||||
ColWriteBatchString(buf, nullVal, ci, &ColWriteBatchPaddedTextString) :
|
||||
ColWriteBatchString(buf, nullVal, ci, &ColWriteBatchTextString);
|
||||
}
|
||||
|
||||
size_t ColWriteBatchVarchar(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
|
||||
{
|
||||
const uchar *buf0= buf;
|
||||
uint32_t colWidthInBytes = m_type.colWidth * m_mbmaxlen;
|
||||
if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr(), "%c", ci.delimiter());
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int dataLength = 0;
|
||||
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
std::string escape;
|
||||
escape.assign((char*)buf, dataLength);
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr(), "%c%.*s%c%c",
|
||||
ci.enclosed_by(),
|
||||
(int)escape.length(), escape.c_str(),
|
||||
ci.enclosed_by(), ci.delimiter());
|
||||
}
|
||||
buf += colWidthInBytes;
|
||||
return buf - buf0;
|
||||
return ColWriteBatchString(buf, nullVal, ci, &ColWriteBatchTextString);
|
||||
}
|
||||
|
||||
size_t ColWriteBatchSInt64(const uchar *buf, bool nullVal, ColBatchWriter &ci) override
|
||||
@ -854,126 +840,15 @@ public:
|
||||
|
||||
size_t ColWriteBatchVarbinary(const uchar *buf0, bool nullVal, ColBatchWriter &ci) override
|
||||
{
|
||||
const uchar *buf= buf0;
|
||||
if (nullVal && (m_type.constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr(), "%c", ci.delimiter());
|
||||
|
||||
if (m_type.colWidth < 256)
|
||||
{
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
buf = buf + 2;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
uint16_t dataLength = 0;
|
||||
|
||||
if (m_type.colWidth < 256)
|
||||
{
|
||||
dataLength = *(int8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(int16_t*) buf;
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
|
||||
const uchar* tmpBuf = buf;
|
||||
|
||||
for (int32_t i = 0; i < dataLength; i++)
|
||||
{
|
||||
fprintf(ci.filePtr(), "%02x", *(uint8_t*)tmpBuf);
|
||||
tmpBuf++;
|
||||
}
|
||||
|
||||
fprintf(ci.filePtr(), "%c", ci.delimiter());
|
||||
|
||||
}
|
||||
|
||||
return buf - buf0;
|
||||
return ColWriteBatchString(buf0, nullVal, ci, &ColWriteBatchBlobString);
|
||||
}
|
||||
|
||||
|
||||
size_t ColWriteBatchBlob(const uchar *buf0, bool nullVal, ColBatchWriter &ci) override
|
||||
{
|
||||
const uchar *buf= buf0;
|
||||
// MCOL-4005 Note that we don't handle nulls as a special
|
||||
// case here as we do for other datatypes, the below works
|
||||
// as expected for nulls.
|
||||
uint32_t dataLength = 0;
|
||||
uintptr_t* dataptr;
|
||||
uchar* ucharptr;
|
||||
bool isBlob = m_type.colDataType == CalpontSystemCatalog::BLOB;
|
||||
uint colWidthInBytes = isBlob ? m_type.colWidth : m_type.colWidth * m_mbmaxlen;
|
||||
|
||||
if (!isBlob && m_field->char_length() == 65535)
|
||||
{
|
||||
// Special case for TEXT field without default length,
|
||||
// such as:
|
||||
// CREATE TABLE mcol4364 (a TEXT);
|
||||
// Here, char_length() represents the number of bytes,
|
||||
// not number of characters.
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf += 2;
|
||||
}
|
||||
else if (colWidthInBytes < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else if (colWidthInBytes < 65536)
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf += 2;
|
||||
}
|
||||
else if (colWidthInBytes < 16777216)
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
dataLength |= ((int) buf[2]) << 16;
|
||||
buf += 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(uint32_t*) buf;
|
||||
buf += 4;
|
||||
}
|
||||
|
||||
// buf contains pointer to blob, for example:
|
||||
// (gdb) p (char*)*(uintptr_t*)buf
|
||||
// $43 = 0x7f68500c58f8 "hello world"
|
||||
|
||||
dataptr = (uintptr_t*)buf;
|
||||
ucharptr = (uchar*)*dataptr;
|
||||
buf += sizeof(uintptr_t);
|
||||
|
||||
if (isBlob)
|
||||
{
|
||||
for (uint32_t i = 0; i < dataLength; i++)
|
||||
{
|
||||
fprintf(ci.filePtr(), "%02x", *(uint8_t*)ucharptr);
|
||||
ucharptr++;
|
||||
}
|
||||
|
||||
fprintf(ci.filePtr(), "%c", ci.delimiter());
|
||||
}
|
||||
else
|
||||
{
|
||||
// TEXT Column
|
||||
std::string escape;
|
||||
escape.assign((char*)ucharptr, dataLength);
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr(), "%c%.*s%c%c",
|
||||
ci.enclosed_by(),
|
||||
(int)escape.length(), escape.c_str(),
|
||||
ci.enclosed_by(), ci.delimiter());
|
||||
}
|
||||
|
||||
return buf - buf0;
|
||||
return (UNLIKELY(m_type.colDataType == CalpontSystemCatalog::BLOB)) ?
|
||||
ColWriteBatchString(buf0, nullVal, ci, &ColWriteBatchBlobString) :
|
||||
ColWriteBatchString(buf0, nullVal, ci, &ColWriteBatchTextString);
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -3186,6 +3186,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
ci->fdt[0] = -1;
|
||||
// now we can send all the data thru FIFO[1], writer of PARENT
|
||||
}
|
||||
// Set read_set used for bulk insertion of Fields inheriting
|
||||
//from Field_blob|Field_varstring. Used in ColWriteBatchString()
|
||||
bitmap_set_all(table->read_set);
|
||||
|
||||
#endif
|
||||
}
|
||||
@ -3247,6 +3250,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
|
||||
ci->rc = rc;
|
||||
ci->singleInsert = true;
|
||||
bitmap_clear_all(table->read_set);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -3256,6 +3260,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
if (stateFlags & SessionManagerServer::SS_SUSPENDED)
|
||||
{
|
||||
setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled.");
|
||||
bitmap_clear_all(table->read_set);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -3270,11 +3275,12 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
}
|
||||
catch (IDBExcept& ie)
|
||||
{
|
||||
bitmap_clear_all(table->read_set);
|
||||
setError(thd, ER_INTERNAL_ERROR, ie.what());
|
||||
// setError(thd, ER_UNKNOWN_TABLE, ie.what());
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
bitmap_clear_all(table->read_set);
|
||||
setError(thd, ER_INTERNAL_ERROR,
|
||||
logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
|
||||
}
|
||||
@ -3288,6 +3294,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
|
||||
int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
{
|
||||
// Clear read_set used for bulk insertion of Fields inheriting
|
||||
//from Field_blob|Field_varstring
|
||||
bitmap_clear_all(table->read_set);
|
||||
THD* thd = current_thd;
|
||||
|
||||
std::string aTmpDir(startup::StartUp::tmpDir());
|
||||
|
Reference in New Issue
Block a user