1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4931 Make cpimport charset-aware. (#2938)

1. Extend the following CalpontSystemCatalog member functions to
   set CalpontSystemCatalog::ColType::charsetNumber, after the
   system catalog update to add charset number to calpontsys.syscolumn
   in MCOL-5005:
     CalpontSystemCatalog::lookupOID
     CalpontSystemCatalog::colType
     CalpontSystemCatalog::columnRIDs
     CalpontSystemCatalog::getSchemaInfo

2. Update cpimport to use the CHARSET_INFO object associated with the
   charset number retrieved from the system catalog, for a
   dictionary/non-dictionary CHAR/VARCHAR/TEXT column, to truncate
   long strings that exceed the target column character length.

3. Add MTR test cases.
This commit is contained in:
Gagan Goel
2023-09-05 10:17:20 -04:00
committed by GitHub
parent 5b4f06bf0d
commit 931f2b36a1
12 changed files with 211 additions and 72 deletions

View File

@ -553,8 +553,9 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta
string autoincrement = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL; string autoincrement = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL;
string nextVal = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL; string nextVal = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL;
string nullable = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NULLABLE_COL; string nullable = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NULLABLE_COL;
string charsetnum = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + CHARSETNUM_COL;
SimpleColumn* col[17]; SimpleColumn* col[18];
col[0] = new SimpleColumn(columnlength, fSessionID); col[0] = new SimpleColumn(columnlength, fSessionID);
col[1] = new SimpleColumn(objectid, fSessionID); col[1] = new SimpleColumn(objectid, fSessionID);
col[2] = new SimpleColumn(datatype, fSessionID); col[2] = new SimpleColumn(datatype, fSessionID);
@ -572,6 +573,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta
col[14] = new SimpleColumn(autoincrement, fSessionID); col[14] = new SimpleColumn(autoincrement, fSessionID);
col[15] = new SimpleColumn(nextVal, fSessionID); col[15] = new SimpleColumn(nextVal, fSessionID);
col[16] = new SimpleColumn(nullable, fSessionID); col[16] = new SimpleColumn(nullable, fSessionID);
col[17] = new SimpleColumn(charsetnum, fSessionID);
SRCP srcp; SRCP srcp;
srcp.reset(col[0]); srcp.reset(col[0]);
@ -609,13 +611,15 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta
colMap.insert(CMVT_(nextVal, srcp)); colMap.insert(CMVT_(nextVal, srcp));
srcp.reset(col[16]); srcp.reset(col[16]);
colMap.insert(CMVT_(nullable, srcp)); colMap.insert(CMVT_(nullable, srcp));
srcp.reset(col[17]);
colMap.insert(CMVT_(charsetnum, srcp));
csep.columnMapNonStatic(colMap); csep.columnMapNonStatic(colMap);
// ignore returnedcolumn, because it's not read by Joblist for now // ignore returnedcolumn, because it's not read by Joblist for now
csep.returnedCols(returnedColumnList); csep.returnedCols(returnedColumnList);
OID oid[17]; OID oid[18];
for (int i = 0; i < 17; i++) for (int i = 0; i < 18; i++)
oid[i] = col[i]->oid(); oid[i] = col[i]->oid();
// Filters // Filters
@ -709,6 +713,8 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta
ct.constraintType = NOTNULL_CONSTRAINT; ct.constraintType = NOTNULL_CONSTRAINT;
} }
} }
else if ((*it)->ColumnOID() == oid[17])
ct.charsetNumber = ((*it)->GetData(0));
else if ((*it)->ColumnOID() == DICTOID_SYSCOLUMN_DEFAULTVAL) else if ((*it)->ColumnOID() == DICTOID_SYSCOLUMN_DEFAULTVAL)
{ {
ct.defaultValue = ((*it)->GetStringData(0)); ct.defaultValue = ((*it)->GetStringData(0));
@ -1077,8 +1083,9 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid
string compressionType = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + COMPRESSIONTYPE_COL; string compressionType = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + COMPRESSIONTYPE_COL;
string autoincrement = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL; string autoincrement = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL;
string nextvalue = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL; string nextvalue = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL;
string charsetnum = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + CHARSETNUM_COL;
SimpleColumn* col[17]; SimpleColumn* col[18];
col[0] = new SimpleColumn(columnlength, fSessionID); col[0] = new SimpleColumn(columnlength, fSessionID);
col[1] = new SimpleColumn(objectid, fSessionID); col[1] = new SimpleColumn(objectid, fSessionID);
col[2] = new SimpleColumn(datatype, fSessionID); col[2] = new SimpleColumn(datatype, fSessionID);
@ -1096,6 +1103,7 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid
col[14] = new SimpleColumn(compressionType, fSessionID); col[14] = new SimpleColumn(compressionType, fSessionID);
col[15] = new SimpleColumn(autoincrement, fSessionID); col[15] = new SimpleColumn(autoincrement, fSessionID);
col[16] = new SimpleColumn(nextvalue, fSessionID); col[16] = new SimpleColumn(nextvalue, fSessionID);
col[17] = new SimpleColumn(charsetnum, fSessionID);
SRCP srcp; SRCP srcp;
srcp.reset(col[0]); srcp.reset(col[0]);
@ -1134,14 +1142,16 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid
colMap.insert(CMVT_(autoincrement, srcp)); colMap.insert(CMVT_(autoincrement, srcp));
srcp.reset(col[16]); srcp.reset(col[16]);
colMap.insert(CMVT_(nextvalue, srcp)); colMap.insert(CMVT_(nextvalue, srcp));
srcp.reset(col[17]);
colMap.insert(CMVT_(charsetnum, srcp));
csep.columnMapNonStatic(colMap); csep.columnMapNonStatic(colMap);
// ignore returnedcolumn, because it's not read by Joblist for now // ignore returnedcolumn, because it's not read by Joblist for now
csep.returnedCols(returnedColumnList); csep.returnedCols(returnedColumnList);
OID oid[17]; OID oid[18];
for (int i = 0; i < 17; i++) for (int i = 0; i < 18; i++)
oid[i] = col[i]->oid(); oid[i] = col[i]->oid();
// Filters // Filters
@ -1231,6 +1241,8 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid
} }
else if ((*it)->ColumnOID() == oid[16]) else if ((*it)->ColumnOID() == oid[16])
ct.nextvalue = ((*it)->GetData(0)); ct.nextvalue = ((*it)->GetData(0));
else if ((*it)->ColumnOID() == oid[17])
ct.charsetNumber = ((*it)->GetData(0));
ct.columnOID = Oid; ct.columnOID = Oid;
} }
@ -3055,8 +3067,9 @@ const CalpontSystemCatalog::RIDList CalpontSystemCatalog::columnRIDs(const Table
string compressiontype = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + COMPRESSIONTYPE_COL; string compressiontype = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + COMPRESSIONTYPE_COL;
string autoIncrement = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL; string autoIncrement = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL;
string nextVal = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL; string nextVal = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL;
string charsetnum = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + CHARSETNUM_COL;
SimpleColumn* col[17]; SimpleColumn* col[18];
col[0] = new SimpleColumn(columnlength, fSessionID); col[0] = new SimpleColumn(columnlength, fSessionID);
col[1] = new SimpleColumn(objectid, fSessionID); col[1] = new SimpleColumn(objectid, fSessionID);
col[2] = new SimpleColumn(datatype, fSessionID); col[2] = new SimpleColumn(datatype, fSessionID);
@ -3074,6 +3087,7 @@ const CalpontSystemCatalog::RIDList CalpontSystemCatalog::columnRIDs(const Table
col[14] = new SimpleColumn(compressiontype, fSessionID); col[14] = new SimpleColumn(compressiontype, fSessionID);
col[15] = new SimpleColumn(autoIncrement, fSessionID); col[15] = new SimpleColumn(autoIncrement, fSessionID);
col[16] = new SimpleColumn(nextVal, fSessionID); col[16] = new SimpleColumn(nextVal, fSessionID);
col[17] = new SimpleColumn(charsetnum, fSessionID);
SRCP srcp; SRCP srcp;
srcp.reset(col[0]); srcp.reset(col[0]);
@ -3111,15 +3125,17 @@ const CalpontSystemCatalog::RIDList CalpontSystemCatalog::columnRIDs(const Table
colMap.insert(CMVT_(autoIncrement, srcp)); colMap.insert(CMVT_(autoIncrement, srcp));
srcp.reset(col[16]); srcp.reset(col[16]);
colMap.insert(CMVT_(nextVal, srcp)); colMap.insert(CMVT_(nextVal, srcp));
srcp.reset(col[17]);
colMap.insert(CMVT_(charsetnum, srcp));
csep.columnMapNonStatic(colMap); csep.columnMapNonStatic(colMap);
srcp.reset(col[1]->clone()); srcp.reset(col[1]->clone());
returnedColumnList.push_back(srcp); returnedColumnList.push_back(srcp);
csep.returnedCols(returnedColumnList); csep.returnedCols(returnedColumnList);
OID oid[17]; OID oid[18];
for (int i = 0; i < 17; i++) for (int i = 0; i < 18; i++)
oid[i] = col[i]->oid(); oid[i] = col[i]->oid();
oid[12] = DICTOID_SYSCOLUMN_COLNAME; oid[12] = DICTOID_SYSCOLUMN_COLNAME;
@ -3303,6 +3319,11 @@ const CalpontSystemCatalog::RIDList CalpontSystemCatalog::columnRIDs(const Table
for (int i = 0; i < (*it)->dataCount(); i++) for (int i = 0; i < (*it)->dataCount(); i++)
ctList[i].nextvalue = ((*it)->GetData(i)); ctList[i].nextvalue = ((*it)->GetData(i));
} }
else if ((*it)->ColumnOID() == oid[17])
{
for (int i = 0; i < (*it)->dataCount(); i++)
ctList[i].charsetNumber = ((*it)->GetData(i));
}
} }
// MCOL-895 sort ctList, we can't specify an ORDER BY to do this yet // MCOL-895 sort ctList, we can't specify an ORDER BY to do this yet
@ -5546,8 +5567,9 @@ void CalpontSystemCatalog::getSchemaInfo(const string& in_schema, int lower_case
string compressiontype = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + COMPRESSIONTYPE_COL; string compressiontype = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + COMPRESSIONTYPE_COL;
string autoinc = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL; string autoinc = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + AUTOINC_COL;
string nextval = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL; string nextval = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + NEXTVALUE_COL;
string charsetnum = CALPONT_SCHEMA + "." + SYSCOLUMN_TABLE + "." + CHARSETNUM_COL;
SimpleColumn* col[17]; SimpleColumn* col[18];
col[0] = new SimpleColumn(columnlength, fSessionID); col[0] = new SimpleColumn(columnlength, fSessionID);
col[1] = new SimpleColumn(objectid, fSessionID); col[1] = new SimpleColumn(objectid, fSessionID);
col[2] = new SimpleColumn(datatype, fSessionID); col[2] = new SimpleColumn(datatype, fSessionID);
@ -5565,6 +5587,7 @@ void CalpontSystemCatalog::getSchemaInfo(const string& in_schema, int lower_case
col[14] = new SimpleColumn(compressiontype, fSessionID); col[14] = new SimpleColumn(compressiontype, fSessionID);
col[15] = new SimpleColumn(autoinc, fSessionID); col[15] = new SimpleColumn(autoinc, fSessionID);
col[16] = new SimpleColumn(nextval, fSessionID); col[16] = new SimpleColumn(nextval, fSessionID);
col[17] = new SimpleColumn(charsetnum, fSessionID);
SRCP srcp; SRCP srcp;
srcp.reset(col[0]); srcp.reset(col[0]);
@ -5603,15 +5626,17 @@ void CalpontSystemCatalog::getSchemaInfo(const string& in_schema, int lower_case
colMap.insert(CMVT_(autoinc, srcp)); colMap.insert(CMVT_(autoinc, srcp));
srcp.reset(col[16]); srcp.reset(col[16]);
colMap.insert(CMVT_(nextval, srcp)); colMap.insert(CMVT_(nextval, srcp));
srcp.reset(col[17]);
colMap.insert(CMVT_(charsetnum, srcp));
csep.columnMapNonStatic(colMap); csep.columnMapNonStatic(colMap);
srcp.reset(col[1]->clone()); srcp.reset(col[1]->clone());
returnedColumnList.push_back(srcp); returnedColumnList.push_back(srcp);
csep.returnedCols(returnedColumnList); csep.returnedCols(returnedColumnList);
OID oid[17]; OID oid[18];
for (int i = 0; i < 17; i++) for (int i = 0; i < 18; i++)
oid[i] = col[i]->oid(); oid[i] = col[i]->oid();
oid[12] = DICTOID_SYSCOLUMN_COLNAME; oid[12] = DICTOID_SYSCOLUMN_COLNAME;
@ -5805,6 +5830,11 @@ void CalpontSystemCatalog::getSchemaInfo(const string& in_schema, int lower_case
for (int i = 0; i < (*it)->dataCount(); i++) for (int i = 0; i < (*it)->dataCount(); i++)
ctList[i].nextvalue = ((*it)->GetData(i)); ctList[i].nextvalue = ((*it)->GetData(i));
} }
else if ((*it)->ColumnOID() == oid[17])
{
for (int i = 0; i < (*it)->dataCount(); i++)
ctList[i].charsetNumber = ((*it)->GetData(i));
}
} }
// populate colinfo cache // populate colinfo cache

View File

@ -0,0 +1,44 @@
DROP DATABASE IF EXISTS mcol_4931;
CREATE DATABASE mcol_4931;
USE mcol_4931;
CREATE TABLE t1 (
a VARCHAR(15) collate 'utf8mb4_croatian_ci',
b CHAR(15) collate 'utf8mb4_croatian_ci',
c VARCHAR(2),
d CHAR(2)
)ENGINE=columnstore default charset=utf8mb4;
CREATE TABLE t2 (
a VARCHAR(15) collate 'latin2_hungarian_ci',
b CHAR(15) collate 'latin2_hungarian_ci',
c VARCHAR(2),
d CHAR(2)
)ENGINE=columnstore default charset=latin2;
LOAD DATA INFILE "DATADIR/mcol4931_1.txt" IGNORE INTO TABLE t1 charset utf8mb4 FIELDS TERMINATED BY "," ENCLOSED BY '"';;
Warnings:
Warning 1265 Data truncated for column 'a' at row 1
Warning 1265 Data truncated for column 'b' at row 1
Warning 1265 Data truncated for column 'c' at row 1
Warning 1265 Data truncated for column 'd' at row 1
LOAD DATA INFILE "DATADIR/mcol4931_2.txt" IGNORE INTO TABLE t2 charset latin2 FIELDS TERMINATED BY "," ENCLOSED BY '"';;
Warnings:
Warning 1265 Data truncated for column 'a' at row 1
Warning 1265 Data truncated for column 'b' at row 1
Warning 1265 Data truncated for column 'c' at row 1
Warning 1265 Data truncated for column 'd' at row 1
SELECT * FROM t1;
a b c d
K<EFBFBD>nig-abcd-Stra K<>nig-abcd-Stra K<> K<>
K<EFBFBD>nig-abcd-Stra K<>nig-abcd-Stra K<> K<>
SELECT CHAR_LENGTH(a), LENGTHB(a), CHAR_LENGTH(b), LENGTHB(b), CHAR_LENGTH(c), LENGTHB(c), CHAR_LENGTH(d), LENGTHB(d) FROM t1;
CHAR_LENGTH(a) LENGTHB(a) CHAR_LENGTH(b) LENGTHB(b) CHAR_LENGTH(c) LENGTHB(c) CHAR_LENGTH(d) LENGTHB(d)
15 16 15 16 2 3 2 3
15 16 15 16 2 3 2 3
SELECT * FROM t2;
a b c d
abcdefghijklmno abcdefghijklmno ab ab
abcdefghijklmno abcdefghijklmno ab ab
SELECT CHAR_LENGTH(a), LENGTHB(a), CHAR_LENGTH(b), LENGTHB(b), CHAR_LENGTH(c), LENGTHB(c), CHAR_LENGTH(d), LENGTHB(d) FROM t2;
CHAR_LENGTH(a) LENGTHB(a) CHAR_LENGTH(b) LENGTHB(b) CHAR_LENGTH(c) LENGTHB(c) CHAR_LENGTH(d) LENGTHB(d)
15 15 15 15 2 2 2 2
15 15 15 15 2 2 2 2
DROP DATABASE mcol_4931;

View File

@ -0,0 +1,57 @@
#
# MCOL-4931 Make cpimport charset aware
#
if (!$MYSQL_TEST_ROOT){
skip Should be run by root to execute cpimport;
}
--source ../include/have_columnstore.inc
let $DATADIR=`SELECT @@datadir`;
--disable_warnings
DROP DATABASE IF EXISTS mcol_4931;
--enable_warnings
CREATE DATABASE mcol_4931;
USE mcol_4931;
CREATE TABLE t1 (
a VARCHAR(15) collate 'utf8mb4_croatian_ci',
b CHAR(15) collate 'utf8mb4_croatian_ci',
c VARCHAR(2),
d CHAR(2)
)ENGINE=columnstore default charset=utf8mb4;
CREATE TABLE t2 (
a VARCHAR(15) collate 'latin2_hungarian_ci',
b CHAR(15) collate 'latin2_hungarian_ci',
c VARCHAR(2),
d CHAR(2)
)ENGINE=columnstore default charset=latin2;
--exec rm -f $DATADIR/mcol4931_1.txt
--exec rm -f $DATADIR/mcol4931_2.txt
--exec echo "\"König-abcd-Straße\",\"König-abcd-Straße\",\"König-abcd-Straße\",\"König-abcd-Straße\"" > $DATADIR/mcol4931_1.txt
--exec echo "\"abcdefghijklmnopq\",\"abcdefghijklmnopq\",\"abcdefghijklmnopq\",\"abcdefghijklmnopq\"" > $DATADIR/mcol4931_2.txt
--replace_result $DATADIR DATADIR
--eval LOAD DATA INFILE "$DATADIR/mcol4931_1.txt" IGNORE INTO TABLE t1 charset utf8mb4 FIELDS TERMINATED BY "," ENCLOSED BY '"';
--replace_result $DATADIR DATADIR
--eval LOAD DATA INFILE "$DATADIR/mcol4931_2.txt" IGNORE INTO TABLE t2 charset latin2 FIELDS TERMINATED BY "," ENCLOSED BY '"';
--disable_result_log
--exec $MCS_CPIMPORT -s',' -E'"' mcol_4931 t1 '$DATADIR/mcol4931_1.txt';
--exec $MCS_CPIMPORT -s',' -E'"' mcol_4931 t2 '$DATADIR/mcol4931_2.txt';
--enable_result_log
SELECT * FROM t1;
SELECT CHAR_LENGTH(a), LENGTHB(a), CHAR_LENGTH(b), LENGTHB(b), CHAR_LENGTH(c), LENGTHB(c), CHAR_LENGTH(d), LENGTHB(d) FROM t1;
SELECT * FROM t2;
SELECT CHAR_LENGTH(a), LENGTHB(a), CHAR_LENGTH(b), LENGTHB(b), CHAR_LENGTH(c), LENGTHB(c), CHAR_LENGTH(d), LENGTHB(d) FROM t2;
# Clean UP
--exec rm -f $DATADIR/mcol4931_1.txt
--exec rm -f $DATADIR/mcol4931_2.txt
DROP DATABASE mcol_4931;

View File

@ -91,33 +91,6 @@ inline std::string wstring_to_utf8(const std::wstring& str)
return ret; return ret;
} }
inline uint8_t utf8_truncate_point(const char* input, size_t length)
{
// Find the beginning of a multibyte char to truncate at and return the
// number of bytes to truncate1`
if (length < 3)
{
return 0;
}
const unsigned char* b = (const unsigned char*)(input) + length - 3;
if (b[2] & 0x80)
{
// First byte in a new multi-byte sequence
if (b[2] & 0x40)
return 1;
// 3 byte sequence
else if ((b[1] & 0xe0) == 0xe0)
return 2;
// 4 byte sequence
else if ((b[0] & 0xf0) == 0xf0)
return 3;
}
return 0;
}
int mcs_strcoll(const char* str1, const char* str2, const uint32_t charsetNumber); int mcs_strcoll(const char* str1, const char* str2, const uint32_t charsetNumber);
int mcs_strcoll(const char* str1, const uint32_t l1, const char* str2, const uint32_t l2, int mcs_strcoll(const char* str1, const uint32_t l1, const char* str2, const uint32_t l2,
const uint32_t charsetNumber); const uint32_t charsetNumber);

View File

@ -48,6 +48,7 @@
#include "MonitorProcMem.h" #include "MonitorProcMem.h"
#include "dataconvert.h" #include "dataconvert.h"
#include "mcsconfig.h" #include "mcsconfig.h"
#include "mariadb_my_sys.h"
using namespace std; using namespace std;
using namespace WriteEngine; using namespace WriteEngine;
@ -1002,6 +1003,9 @@ int main(int argc, char** argv)
{ {
setupSignalHandlers(); setupSignalHandlers();
// Initialize the charset library
MY_INIT(argv[0]);
// Set locale language // Set locale language
const char* pLoc = setlocale(LC_ALL, ""); const char* pLoc = setlocale(LC_ALL, "");
if (pLoc) if (pLoc)
@ -1316,6 +1320,9 @@ int main(int argc, char** argv)
rc = ERR_UNKNOWN; rc = ERR_UNKNOWN;
} }
// Free up resources allocated by MY_INIT() above.
my_end(0);
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// Log end of job to INFO log // Log end of job to INFO log
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------

View File

@ -43,8 +43,6 @@
#include "joblisttypes.h" #include "joblisttypes.h"
#include "utils_utf8.h" // utf8_truncate_point()
using namespace std; using namespace std;
using namespace boost; using namespace boost;
using namespace execplan; using namespace execplan;
@ -515,14 +513,32 @@ void BulkLoadBuffer::convert(char* field, int fieldLength, bool nullFlag, unsign
// from storing characters beyond the column's defined width. // from storing characters beyond the column's defined width.
// It contains the column definition width rather than the bytes // It contains the column definition width rather than the bytes
// on disk (e.g. 5 for a varchar(5) instead of 8). // on disk (e.g. 5 for a varchar(5) instead of 8).
if (fieldLength > column.definedWidth) if (column.cs->mbmaxlen > 1)
{ {
uint8_t truncate_point = utf8::utf8_truncate_point(field, column.definedWidth); const CHARSET_INFO* cs = column.cs;
memcpy(charTmpBuf, field, column.definedWidth - truncate_point); const char* start = (const char*) field;
bufStats.satCount++; const char* end = (const char*)(field + fieldLength);
size_t numChars = cs->numchars(start, end);
size_t maxCharLength = column.definedWidth / cs->mbmaxlen;
if (numChars > maxCharLength)
{
MY_STRCOPY_STATUS status;
cs->well_formed_char_length(start, end, maxCharLength, &status);
fieldLength = status.m_source_end_pos - start;
bufStats.satCount++;
}
} }
else else // cs->mbmaxlen == 1
memcpy(charTmpBuf, field, fieldLength); {
if (fieldLength > column.definedWidth)
{
fieldLength = column.definedWidth;
bufStats.satCount++;
}
}
memcpy(charTmpBuf, field, fieldLength);
} }
// Swap byte order before comparing character string // Swap byte order before comparing character string

View File

@ -1697,7 +1697,7 @@ int ColumnInfo::updateDctnryStore(char* buf, ColPosPair** pos, const int totalRo
Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT); Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif #endif
int rc = fStore->insertDctnry(buf, pos, totalRow, id, tokenBuf, truncCount); int rc = fStore->insertDctnry(buf, pos, totalRow, id, tokenBuf, truncCount, column.cs);
if (rc != NO_ERROR) if (rc != NO_ERROR)
{ {

View File

@ -48,7 +48,6 @@ using namespace BRM;
#include "IDBPolicy.h" #include "IDBPolicy.h"
#include "cacheutils.h" #include "cacheutils.h"
using namespace idbdatafile; using namespace idbdatafile;
#include "utils_utf8.h" // utf8_truncate_point()
#include "checks.h" #include "checks.h"
namespace namespace
@ -764,7 +763,7 @@ int Dctnry::insertDctnry2(Signature& sig)
* failure - it did not write the header to block * failure - it did not write the header to block
******************************************************************************/ ******************************************************************************/
int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf, int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf,
long long& truncCount) long long& truncCount, const CHARSET_INFO* cs)
{ {
#ifdef PROFILE #ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT); Stats::startParseEvent(WE_STATS_PARSE_DCT);
@ -837,12 +836,28 @@ int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow,
curSig.signature = (unsigned char*)pIn; curSig.signature = (unsigned char*)pIn;
} }
// @Bug 2565: Truncate any strings longer than schema's column width if (cs->mbmaxlen > 1)
if (curSig.size > m_colWidth)
{ {
uint8_t truncate_point = utf8::utf8_truncate_point((const char*)curSig.signature, m_colWidth); const char* start = (const char*) curSig.signature;
curSig.size = m_colWidth - truncate_point; const char* end = (const char*)(curSig.signature + curSig.size);
++truncCount; size_t numChars = cs->numchars(start, end);
size_t maxCharLength = m_colWidth / cs->mbmaxlen;
if (numChars > maxCharLength)
{
MY_STRCOPY_STATUS status;
cs->well_formed_char_length(start, end, maxCharLength, &status);
curSig.size = status.m_source_end_pos - start;
truncCount++;
}
}
else // cs->mbmaxlen == 1
{
if (curSig.size > m_colWidth)
{
curSig.size = m_colWidth;
truncCount++;
}
} }
//...Search for the string in our string cache //...Search for the string in our string cache

View File

@ -168,7 +168,7 @@ class Dctnry : public DbFileOp
* @param tokenBuf - (output) list of tokens for the parsed strings * @param tokenBuf - (output) list of tokens for the parsed strings
*/ */
EXPORT int insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, EXPORT int insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col,
char* tokenBuf, long long& truncCount); char* tokenBuf, long long& truncCount, const CHARSET_INFO* cs);
/** /**
* @brief Update dictionary store with tokenized strings (for DDL/DML use) * @brief Update dictionary store with tokenized strings (for DDL/DML use)

View File

@ -135,22 +135,6 @@ class DctnryStore : public DbFileOp
*/ */
EXPORT const int updateDctnryStore(unsigned char* sigValue, int& sigSize, Token& token); EXPORT const int updateDctnryStore(unsigned char* sigValue, int& sigSize, Token& token);
/**
* @brief Update dictionary store with tokenized strings (for Bulk use)
*
* @param buf - bulk buffer containing strings to be parsed
* @param pos - list of offsets into buf
* @param totalRow - total number of rows in buf
* @param col - the column to be parsed from buf
* @param colWidth - width of the dictionary column being parsed
* @param tokenBuf - (output) list of tokens for the parsed strings
*/
const int updateDctnryStore(const char* buf, ColPosPair** pos, const int totalRow, const int col,
const int colWidth, char* tokenBuf)
{
return (m_dctnry.insertDctnry(buf, pos, totalRow, col, colWidth, tokenBuf));
}
/** /**
* @brief TransId related function * @brief TransId related function
* *

View File

@ -40,6 +40,7 @@
#include "IDBDataFile.h" #include "IDBDataFile.h"
#include "IDBPolicy.h" #include "IDBPolicy.h"
#include "nullstring.h" #include "nullstring.h"
#include "collation.h" // For CHARSET_INFO struct
#undef EXPORT #undef EXPORT
#undef DELETE #undef DELETE
@ -410,6 +411,7 @@ struct JobColumn /** @brief Job Column Structure */
double fDefaultDbl; /** @brief Dbl/Flt column default */ double fDefaultDbl; /** @brief Dbl/Flt column default */
int128_t fDefaultWideDecimal; /** @brief Wide decimal column default */ int128_t fDefaultWideDecimal; /** @brief Wide decimal column default */
utils::NullString fDefaultChr; /** @brief Char column default */ utils::NullString fDefaultChr; /** @brief Char column default */
const CHARSET_INFO* cs; /** @brief character set info for the column */
JobColumn() JobColumn()
: mapOid(0) : mapOid(0)
, dataType(execplan::CalpontSystemCatalog::INT) , dataType(execplan::CalpontSystemCatalog::INT)
@ -435,6 +437,7 @@ struct JobColumn /** @brief Job Column Structure */
, fDefaultUInt(0) , fDefaultUInt(0)
, fDefaultDbl(0.0) , fDefaultDbl(0.0)
, fDefaultWideDecimal(0) , fDefaultWideDecimal(0)
, cs(nullptr)
{ {
} }
JobColumn(const std::string& colName_, OID mapOid_, const std::string& typeName_, JobColumn(const std::string& colName_, OID mapOid_, const std::string& typeName_,
@ -466,6 +469,7 @@ struct JobColumn /** @brief Job Column Structure */
, fDefaultUInt(defaultUInt_) , fDefaultUInt(defaultUInt_)
, fDefaultDbl(0.0) , fDefaultDbl(0.0)
, fDefaultWideDecimal(0) , fDefaultWideDecimal(0)
, cs(nullptr)
{ {
dctnry.fCompressionType = dctnryCompressionType_; dctnry.fCompressionType = dctnryCompressionType_;
} }

View File

@ -871,6 +871,15 @@ void XMLJob::fillInXMLDataAsLoaded(execplan::CalpontSystemCatalog::RIDList& colR
col.compressionType = colType.compressionType; col.compressionType = colType.compressionType;
col.dctnry.fCompressionType = colType.compressionType; col.dctnry.fCompressionType = colType.compressionType;
if (colType.charsetNumber != 0)
{
col.cs = &datatypes::Charset(colType.charsetNumber).getCharset();
}
else
{
col.cs = &my_charset_latin1;
}
if (colType.autoincrement) if (colType.autoincrement)
col.autoIncFlag = true; col.autoIncFlag = true;
else else