You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-01 06:21:41 +03:00
Leverage MDB Field methods to output char or binary-based types when processing batch inserts
This commit is contained in:
@ -970,6 +970,15 @@ inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type)
|
||||
execplan::CalpontSystemCatalog::TEXT == type);
|
||||
}
|
||||
|
||||
/** convenience function to determine if column type is BLOB or VARBINARY
|
||||
*/
|
||||
inline bool isBlobOrVarbinary(const execplan::CalpontSystemCatalog::ColDataType type)
|
||||
{
|
||||
return (execplan::CalpontSystemCatalog::BLOB == type ||
|
||||
execplan::CalpontSystemCatalog::VARBINARY == type);
|
||||
}
|
||||
|
||||
|
||||
/** convenience function to determine if column type is a
|
||||
* numeric type
|
||||
*/
|
||||
|
@ -884,93 +884,6 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
{
|
||||
uint32_t colWidthInBytes = ci.columnTypes[colpos].colWidth;
|
||||
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
|
||||
{
|
||||
// Pad to the full length of the field
|
||||
escape.assign((char*)buf, colWidthInBytes);
|
||||
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
|
||||
escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Get the actual data length
|
||||
Field* field = table->field[colpos];
|
||||
bitmap_set_bit(table->read_set, field->field_index);
|
||||
String attribute;
|
||||
field->val_str(&attribute);
|
||||
|
||||
escape.assign((char*)buf, attribute.length());
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
|
||||
escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
}
|
||||
|
||||
buf += colWidthInBytes;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
{
|
||||
uint32_t colWidthInBytes = ci.columnTypes[colpos].colWidth;
|
||||
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Maximum number of bytes allowed for a VARCHAR
|
||||
// field is 65532, so the max length fits in 2 bytes.
|
||||
// dataLength is length in bytes, not length in chars
|
||||
uint16_t dataLength = 0;
|
||||
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
|
||||
escape.assign((char*)buf, dataLength);
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
|
||||
buf += colWidthInBytes;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::BIGINT:
|
||||
{
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
@ -1659,134 +1572,43 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
case CalpontSystemCatalog::TEXT:
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
case CalpontSystemCatalog::BLOB:
|
||||
case CalpontSystemCatalog::VARBINARY:
|
||||
{
|
||||
// For a VARBINARY field, ci.columnTypes[colpos].colWidth == colWidthInBytes
|
||||
Field* fieldPtr = table->field[colpos];
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
|
||||
if (ci.columnTypes[colpos].colWidth < 256)
|
||||
}
|
||||
else
|
||||
{
|
||||
String value;
|
||||
// We need to set table->read_set for a field first.
|
||||
// This happens in ha_mcs_impl_start_bulk_insert().
|
||||
fieldPtr->val_str(&value);
|
||||
if (UNLIKELY(execplan::isBlobOrVarbinary(ci.columnTypes[colpos].colDataType)))
|
||||
{
|
||||
buf++;
|
||||
const char* ptr = value.ptr();
|
||||
for (uint32_t i = 0; i < value.length(); i++)
|
||||
{
|
||||
fprintf(ci.filePtr, "%02x", *(uint8_t*)ptr+i);
|
||||
}
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
}
|
||||
else
|
||||
{
|
||||
buf = buf + 2;
|
||||
escape.assign(value.ptr(), value.length());
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
|
||||
escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Maximum number of bytes allowed for a VARBINARY
|
||||
// field is 65532, so the max length fits in 2 bytes.
|
||||
// dataLength is length in bytes, not length in chars
|
||||
uint16_t dataLength = 0;
|
||||
|
||||
if (ci.columnTypes[colpos].colWidth < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf = buf + 2 ;
|
||||
}
|
||||
|
||||
const uchar* tmpBuf = buf;
|
||||
|
||||
for (int32_t i = 0; i < dataLength; i++)
|
||||
{
|
||||
fprintf(ci.filePtr, "%02x", *(uint8_t*)tmpBuf);
|
||||
tmpBuf++;
|
||||
}
|
||||
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
}
|
||||
|
||||
buf += ci.columnTypes[colpos].colWidth;
|
||||
|
||||
buf+= fieldPtr->pack_length();
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::BLOB:
|
||||
case CalpontSystemCatalog::TEXT:
|
||||
{
|
||||
// MCOL-4005 Note that we don't handle nulls as a special
|
||||
// case here as we do for other datatypes, the below works
|
||||
// as expected for nulls.
|
||||
// dataLength is length in bytes, not length in chars
|
||||
uint32_t dataLength = 0;
|
||||
uintptr_t* dataptr;
|
||||
uchar* ucharptr;
|
||||
uint32_t colWidthInBytes;
|
||||
|
||||
bool isBlob = ci.columnTypes[colpos].colDataType == CalpontSystemCatalog::BLOB;
|
||||
|
||||
if (isBlob)
|
||||
{
|
||||
colWidthInBytes = ci.columnTypes[colpos].colWidth;
|
||||
}
|
||||
else
|
||||
{
|
||||
// For TEXT fields, MDB sets char_length to the maximum number that will fit in the number of bytes the
|
||||
// defined TEXT length will fit in.
|
||||
// Ex:
|
||||
// TEXT(25) will have char_length() == 255;
|
||||
// TEXT(200) for latin_1 will have char_length() = 255
|
||||
// TEXT(200) for udf8mb4 will have char_length() = 65535
|
||||
// the length 200 multiplied by mbmaxlen (4) is > 255, so it needs 2 bytes for length. MDB sets to max(uint_16t)
|
||||
colWidthInBytes = table->field[colpos]->char_length();
|
||||
}
|
||||
|
||||
if (colWidthInBytes < 256)
|
||||
{
|
||||
dataLength = *(uint8_t*) buf;
|
||||
buf++;
|
||||
}
|
||||
else if (colWidthInBytes < 65536)
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
buf += 2;
|
||||
}
|
||||
else if (colWidthInBytes < 16777216)
|
||||
{
|
||||
dataLength = *(uint16_t*) buf;
|
||||
dataLength |= ((int) buf[2]) << 16;
|
||||
buf += 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataLength = *(uint32_t*) buf;
|
||||
buf += 4;
|
||||
}
|
||||
|
||||
// buf contains pointer to blob, for example:
|
||||
// (gdb) p (char*)*(uintptr_t*)buf
|
||||
// $43 = 0x7f68500c58f8 "hello world"
|
||||
dataptr = (uintptr_t*)buf;
|
||||
ucharptr = (uchar*)*dataptr;
|
||||
buf += sizeof(uintptr_t);
|
||||
|
||||
if (isBlob)
|
||||
{
|
||||
for (uint32_t i = 0; i < dataLength; i++)
|
||||
{
|
||||
fprintf(ci.filePtr, "%02x", *(uint8_t*)ucharptr);
|
||||
ucharptr++;
|
||||
}
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
}
|
||||
else
|
||||
{
|
||||
escape.assign((char*)ucharptr, dataLength);
|
||||
boost::replace_all(escape, "\\", "\\\\");
|
||||
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default: // treat as int64
|
||||
{
|
||||
break;
|
||||
|
@ -3539,7 +3539,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
ci->fdt[0] = -1;
|
||||
// now we can send all the data thru FIFO[1], writer of PARENT
|
||||
}
|
||||
|
||||
// Set read_set used for bulk insertion of Fields inheriting
|
||||
//from Field_blob|Field_varstring
|
||||
bitmap_set_all(table->read_set);
|
||||
#endif
|
||||
}
|
||||
else
|
||||
@ -3600,6 +3602,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
setError(current_thd, ER_READ_ONLY_MODE, "Cannot execute the statement. DBRM is read only!");
|
||||
ci->rc = rc;
|
||||
ci->singleInsert = true;
|
||||
bitmap_clear_all(table->read_set);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -3609,6 +3612,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
if (stateFlags & SessionManagerServer::SS_SUSPENDED)
|
||||
{
|
||||
setError(current_thd, ER_INTERNAL_ERROR, "Writing to the database is disabled.");
|
||||
bitmap_clear_all(table->read_set);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -3624,10 +3628,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
catch (IDBExcept& ie)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, ie.what());
|
||||
// setError(thd, ER_UNKNOWN_TABLE, ie.what());
|
||||
bitmap_clear_all(table->read_set);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
bitmap_clear_all(table->read_set);
|
||||
setError(thd, ER_INTERNAL_ERROR,
|
||||
logging::IDBErrorInfo::instance()->errorMsg(ERR_SYSTEM_CATALOG) + ex.what());
|
||||
}
|
||||
@ -3641,6 +3646,9 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
|
||||
int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
{
|
||||
// Clear read_set used for bulk insertion of Fields inheriting
|
||||
//from Field_blob|Field_varstring
|
||||
bitmap_clear_all(table->read_set);
|
||||
THD* thd = current_thd;
|
||||
|
||||
std::string aTmpDir(startup::StartUp::tmpDir());
|
||||
@ -3670,7 +3678,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) || ci->isCacheInsert) )
|
||||
{
|
||||
#ifdef _MSC_VER
|
||||
|
||||
if (thd->killed > 0)
|
||||
{
|
||||
errno = 0;
|
||||
|
Reference in New Issue
Block a user