1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #141 from mariadb-corporation/MCOL-267

MCOL-267 Add BLOB/TEXT support
This commit is contained in:
dhall-InfiniDB
2017-03-23 12:19:40 -05:00
committed by GitHub
45 changed files with 538 additions and 214 deletions

View File

@@ -156,6 +156,14 @@ EXISTS {return EXISTS;}
CHANGE {return CHANGE;}
TRUNCATE {return TRUNCATE;}
VARBINARY {return VARBINARY;}
TINYBLOB {return TINYBLOB;}
BLOB {return BLOB;}
MEDIUMBLOB {return MEDIUMBLOB;}
LONGBLOB {return LONGBLOB;}
TINYTEXT {return TINYTEXT;}
TEXT {return TEXT;}
MEDIUMTEXT {return MEDIUMTEXT;}
LONGTEXT {return LONGTEXT;}
\n { lineno++;}

View File

@@ -110,12 +110,14 @@ char* copy_string(const char *str);
%}
%token ACTION ADD ALTER AUTO_INCREMENT BIGINT BIT IDB_BLOB CASCADE IDB_CHAR CHARACTER CHECK CLOB COLUMN
%token ACTION ADD ALTER AUTO_INCREMENT BIGINT BIT BLOB IDB_BLOB CASCADE IDB_CHAR
CHARACTER CHECK CLOB COLUMN
COLUMNS COMMENT CONSTRAINT CONSTRAINTS CREATE CURRENT_USER DATETIME DEC
DECIMAL DEFAULT DEFERRABLE DEFERRED IDB_DELETE DROP ENGINE
FOREIGN FULL IMMEDIATE INDEX INITIALLY IDB_INT INTEGER KEY MATCH MAX_ROWS
FOREIGN FULL IMMEDIATE INDEX INITIALLY IDB_INT INTEGER KEY LONGBLOB LONGTEXT
MATCH MAX_ROWS MEDIUMBLOB MEDIUMTEXT
MIN_ROWS MODIFY NO NOT NULL_TOK NUMBER NUMERIC ON PARTIAL PRECISION PRIMARY
REFERENCES RENAME RESTRICT SET SMALLINT TABLE TIME
REFERENCES RENAME RESTRICT SET SMALLINT TABLE TEXT TIME TINYBLOB TINYTEXT
TINYINT TO UNIQUE UNSIGNED UPDATE USER SESSION_USER SYSTEM_USER VARCHAR VARBINARY
VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE
@@ -136,6 +138,8 @@ VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE
%type <ata> ata_rename_table
%type <columnType> character_string_type
%type <columnType> binary_string_type
%type <columnType> blob_type
%type <columnType> text_type
%type <str> check_constraint_def
%type <columnConstraintDef> column_constraint
%type <columnConstraintDef> column_constraint_def
@@ -704,6 +708,8 @@ data_type:
| binary_string_type
| numeric_type
| datetime_type
| blob_type
| text_type
| IDB_BLOB
{
$$ = new ColumnType(DDL_BLOB);
@@ -865,6 +871,62 @@ binary_string_type:
}
;
blob_type:
BLOB '(' ICONST ')'
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = atol($3);
}
| BLOB
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 65535;
}
| TINYBLOB
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 255;
}
| MEDIUMBLOB
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 16777215;
}
| LONGBLOB
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 2100000000;
}
;
text_type:
TEXT '(' ICONST ')'
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = atol($3);
}
| TEXT
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 65535;
}
| TINYTEXT
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 255;
}
| MEDIUMTEXT
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 16777215;
}
| LONGTEXT
{
$$ = new ColumnType(DDL_BLOB);
$$->fLength = 2100000000;
}
;
numeric_type:
exact_numeric_type
| approximate_numeric_type

View File

@@ -1008,7 +1008,7 @@ struct ColumnType
int fType;
/** @brief Length of datatype in bytes */
int fLength;
long fLength;
/** @brief SQL precision. This is the number of digits in the representation. */
int fPrecision;

View File

@@ -561,7 +561,8 @@ void AlterTableProcessor::addColumn (uint32_t sessionID, execplan::CalpontSystem
if ((columnDefPtr->fType->fType == CalpontSystemCatalog::CHAR && columnDefPtr->fType->fLength > 8) ||
(columnDefPtr->fType->fType == CalpontSystemCatalog::VARCHAR && columnDefPtr->fType->fLength > 7) ||
(columnDefPtr->fType->fType == CalpontSystemCatalog::VARBINARY && columnDefPtr->fType->fLength > 7))
(columnDefPtr->fType->fType == CalpontSystemCatalog::VARBINARY && columnDefPtr->fType->fLength > 7) ||
(columnDefPtr->fType->fType == CalpontSystemCatalog::BLOB))
{
isDict = true;
}

View File

@@ -244,7 +244,8 @@ keepGoing:
dataType = convertDataType(tableDef.fColumns[i]->fType->fType);
if ( (dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) )
(dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && tableDef.fColumns[i]->fType->fLength > 7) )
numDictCols++;
}
fStartingColOID = fObjectIDManager.allocOIDs(numColumns+numDictCols+1); //include column, oids,dictionary oids and tableoid
@@ -533,7 +534,8 @@ cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg
bytestream << (uint32_t) colDefPtr->fType->fCompressiontype;
if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) )
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) )
{
bytestream << (uint32_t) (fStartingColOID+numColumns+(dictNum++)+1);
bytestream << (uint8_t) dataType;

View File

@@ -282,7 +282,7 @@ namespace ddlpackageprocessor
dictStruct.treeOid = colType.ddn.treeOID;
dictStruct.listOid = colType.ddn.listOID;
dictStruct.dctnryOid = colType.ddn.dictOID;
memcpy(dictTuple.sigValue, data.c_str(), data.length());
dictTuple.sigValue = data.c_str();
dictTuple.sigSize = data.length();
int error = NO_ERROR;
if ( NO_ERROR != (error = fWriteEngine->tokenize( fTxnID, dictStruct, dictTuple)) )

View File

@@ -673,7 +673,7 @@ boost::any DDLPackageProcessor::tokenizeData(execplan::CalpontSystemCatalog::SCN
//added for multifiles per oid
dictStruct.columnOid = colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, str.c_str(), str.length());
dictTuple.sigValue = (unsigned char*)str.c_str();
dictTuple.sigSize = str.length();
int error = NO_ERROR;
if (NO_ERROR != (error = fWriteEngine.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file

View File

@@ -131,7 +131,7 @@ boost::any DMLPackageProcessor::tokenizeData( execplan::CalpontSystemCatalog::SC
dictStruct.dctnryOid = colType.ddn.dictOID;
//cout << "Dictionary OIDs: " << colType.ddn.treeOID << " " << colType.ddn.listOID << endl;
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, data.c_str(), data.length());
dictTuple.sigValue = data.c_str();
dictTuple.sigSize = data.length();
int error = NO_ERROR;
if ( NO_ERROR != (error = fWriteEngine.tokenize( txnID, dictStruct, dictTuple)) )

View File

@@ -443,7 +443,8 @@ protected:
if (((colType.colDataType == execplan::CalpontSystemCatalog::CHAR) && (colType.colWidth > 8))
|| ((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7))
|| ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 18))
|| (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY))
|| (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY)
|| (colType.colDataType == execplan::CalpontSystemCatalog::BLOB))
{
return true;
}

View File

@@ -904,7 +904,9 @@ const CalpontSystemCatalog::TableAliasName make_aliasview(const std::string& s,
*/
inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type)
{
return (execplan::CalpontSystemCatalog::VARCHAR == type || execplan::CalpontSystemCatalog::CHAR == type);
return (execplan::CalpontSystemCatalog::VARCHAR == type ||
execplan::CalpontSystemCatalog::CHAR == type ||
execplan::CalpontSystemCatalog::BLOB == type);
}
/** convenience function to determine if column type is an

View File

@@ -583,6 +583,7 @@ void SimpleColumn::evaluate(Row& row, bool& isNull)
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
{
fResult.strVal = row.getVarBinaryStringField(fInputIndex);
break;

View File

@@ -176,6 +176,7 @@ const string SimpleFilter::data() const
if (dynamic_cast<ConstantColumn*>(fRhs) &&
(fRhs->resultType().colDataType == CalpontSystemCatalog::VARCHAR ||
fRhs->resultType().colDataType == CalpontSystemCatalog::CHAR ||
fRhs->resultType().colDataType == CalpontSystemCatalog::BLOB ||
fRhs->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
fRhs->resultType().colDataType == CalpontSystemCatalog::DATE ||
fRhs->resultType().colDataType == CalpontSystemCatalog::DATETIME))
@@ -185,6 +186,7 @@ const string SimpleFilter::data() const
if (dynamic_cast<ConstantColumn*>(fLhs) &&
(fLhs->resultType().colDataType == CalpontSystemCatalog::VARCHAR ||
fLhs->resultType().colDataType == CalpontSystemCatalog::CHAR ||
fLhs->resultType().colDataType == CalpontSystemCatalog::BLOB ||
fLhs->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
fLhs->resultType().colDataType == CalpontSystemCatalog::DATE ||
fLhs->resultType().colDataType == CalpontSystemCatalog::DATETIME))

View File

@@ -394,6 +394,7 @@ inline bool TreeNode::getBoolVal()
return (atoi(fResult.strVal.c_str()) != 0);
//FIXME: Huh???
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
if (fResultType.colWidth <= 7)
return (atoi((char*)(&fResult.origIntVal)) != 0);
return (atoi(fResult.strVal.c_str()) != 0);
@@ -440,6 +441,7 @@ inline const std::string& TreeNode::getStrVal()
break;
//FIXME: ???
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
if (fResultType.colWidth <= 7)
fResult.strVal = (char*)(&fResult.origIntVal);
break;
@@ -573,6 +575,7 @@ inline int64_t TreeNode::getIntVal()
return atoll(fResult.strVal.c_str());
//FIXME: ???
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
if (fResultType.colWidth <= 7)
return fResult.intVal;
return atoll(fResult.strVal.c_str());
@@ -656,6 +659,7 @@ inline float TreeNode::getFloatVal()
return atof(fResult.strVal.c_str());
//FIXME: ???
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
if (fResultType.colWidth <= 7)
return atof((char*)(&fResult.origIntVal));
return atof(fResult.strVal.c_str());
@@ -703,6 +707,7 @@ inline double TreeNode::getDoubleVal()
return strtod(fResult.strVal.c_str(), NULL);
//FIXME: ???
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
if (fResultType.colWidth <= 7)
return strtod((char*)(&fResult.origIntVal), NULL);
return strtod(fResult.strVal.c_str(), NULL);
@@ -746,6 +751,7 @@ inline IDB_Decimal TreeNode::getDecimalVal()
case CalpontSystemCatalog::VARCHAR:
throw logging::InvalidConversionExcept("TreeNode::getDecimalVal: non-support conversion from string");
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
throw logging::InvalidConversionExcept("TreeNode::getDecimalVal: non-support conversion from binary string");
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::MEDINT:

View File

@@ -231,7 +231,7 @@ void CrossEngineStep::makeMappings()
}
void CrossEngineStep::setField(int i, const char* value, Row& row)
void CrossEngineStep::setField(int i, const char* value, unsigned long length, Row& row)
{
CalpontSystemCatalog::ColDataType colType = row.getColType(i);
@@ -243,6 +243,13 @@ void CrossEngineStep::setField(int i, const char* value, Row& row)
else
row.setStringField("", i);
}
else if ((colType == CalpontSystemCatalog::BLOB) || (colType == CalpontSystemCatalog::VARBINARY))
{
if (value != NULL)
row.setVarBinaryField((const uint8_t*)value, length, i);
else
row.setVarBinaryField(NULL, 0, i);
}
else
{
CalpontSystemCatalog::ColType ct;
@@ -484,7 +491,7 @@ void CrossEngineStep::execute()
while ((rowIn = mysql->nextRow()) && !cancelled())
{
for(int i = 0; i < num_fields; i++)
setField(i, rowIn[i], fRowDelivered);
setField(i, rowIn[i], mysql->getFieldLength(i), fRowDelivered);
addRow(rgDataDelivered);
}
@@ -504,7 +511,7 @@ void CrossEngineStep::execute()
for(int i = 0; i < num_fields; i++)
{
if (fFe1Column[i] != -1)
setField(fFe1Column[i], rowIn[i], rowFe1);
setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), rowFe1);
}
if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false)
@@ -518,7 +525,7 @@ void CrossEngineStep::execute()
for(int i = 0; i < num_fields; i++)
{
if (fFe1Column[i] == -1)
setField(i, rowIn[i], fRowDelivered);
setField(i, rowIn[i], mysql->getFieldLength(i), fRowDelivered);
}
addRow(rgDataDelivered);
@@ -536,7 +543,7 @@ void CrossEngineStep::execute()
while ((rowIn = mysql->nextRow()) && !cancelled())
{
for(int i = 0; i < num_fields; i++)
setField(i, rowIn[i], rowFe3);
setField(i, rowIn[i], mysql->getFieldLength(i), rowFe3);
fFeInstance->evaluate(rowFe3, fFeSelects);
fFeInstance->evaluate(rowFe3, fFeSelects);
@@ -567,7 +574,7 @@ void CrossEngineStep::execute()
for(int i = 0; i < num_fields; i++)
{
if (fFe1Column[i] != -1)
setField(fFe1Column[i], rowIn[i], rowFe1);
setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), rowFe1);
}
if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false)
@@ -581,7 +588,7 @@ void CrossEngineStep::execute()
for(int i = 0; i < num_fields; i++)
{
if (fFe1Column[i] == -1)
setField(i, rowIn[i], rowFe3);
setField(i, rowIn[i], mysql->getFieldLength(i), rowFe3);
}
fFeInstance->evaluate(rowFe3, fFeSelects);

View File

@@ -60,13 +60,20 @@ public:
int getFieldCount() { return mysql_num_fields(fRes); }
int getRowCount() { return mysql_num_rows(fRes); }
char** nextRow() { return mysql_fetch_row(fRes); }
char** nextRow()
{
char** row = mysql_fetch_row(fRes);
fieldLengths = mysql_fetch_lengths(fRes);
return row;
}
long getFieldLength(int field) { return fieldLengths[field]; }
const std::string& getError() { return fErrStr; }
private:
MYSQL* fCon;
MYSQL_RES* fRes;
std::string fErrStr;
unsigned long *fieldLengths;
};
/** @brief class CrossEngineStep
@@ -150,7 +157,7 @@ protected:
virtual void makeMappings();
virtual void addFilterStr(const std::vector<const execplan::Filter*>&, const std::string&);
virtual std::string makeQuery();
virtual void setField(int, const char*, rowgroup::Row&);
virtual void setField(int, const char*, unsigned long, rowgroup::Row&);
inline void addRow(rowgroup::RGData &);
//inline void addRow(boost::shared_array<uint8_t>&);
virtual int64_t convertValueNum(

View File

@@ -324,7 +324,8 @@ void ExpressionStep::addColumn(ReturnedColumn* rc, JobInfo& jobInfo)
void ExpressionStep::populateColumnInfo(ReturnedColumn* rc, JobInfo& jobInfo)
{
// As of bug3695, make sure varbinary is not used in function expression.
if (rc->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK)
if ((rc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
rc->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK)
throw runtime_error("VARBINARY in filter or function is not supported.");
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(rc);
@@ -344,7 +345,8 @@ void ExpressionStep::populateColumnInfo(ReturnedColumn* rc, JobInfo& jobInfo)
void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo)
{
// As of bug3695, make sure varbinary is not used in function expression.
if (sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK)
if ((sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
sc->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK)
throw runtime_error ("VARBINARY in filter or function is not supported.");
CalpontSystemCatalog::OID tblOid = joblist::tableOid(sc, jobInfo.csc);
@@ -409,7 +411,8 @@ void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo)
void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobInfo)
{
// As of bug3695, make sure varbinary is not used in function expression.
if (wc->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK)
if ((wc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
wc->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK)
throw runtime_error("VARBINARY in filter or function is not supported.");
// This is for window function in IN/EXISTS sub-query.
@@ -434,7 +437,8 @@ void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobIn
void ExpressionStep::populateColumnInfo(AggregateColumn* ac, JobInfo& jobInfo)
{
// As of bug3695, make sure varbinary is not used in function expression.
if (ac->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK)
if ((ac->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
ac->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK)
throw runtime_error("VARBINARY in filter or function is not supported.");
// This is for aggregate function in IN/EXISTS sub-query.

View File

@@ -171,7 +171,8 @@ pColScanStep::pColScanStep(
}
//If this is a dictionary column, fudge the numbers...
if (fColType.colDataType == CalpontSystemCatalog::VARBINARY)
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY)
|| (fColType.colDataType == CalpontSystemCatalog::BLOB))
{
fColType.colWidth = 8;
fIsDict = true;

View File

@@ -169,7 +169,8 @@ pColStep::pColStep(
}
//If this is a dictionary column, fudge the numbers...
if (fColType.colDataType == CalpontSystemCatalog::VARBINARY)
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY)
|| (fColType.colDataType == CalpontSystemCatalog::BLOB))
{
fColType.colWidth = 8;
fIsDict = true;

View File

@@ -1585,6 +1585,49 @@ int ha_calpont_impl_write_batch_row_(uchar *buf, TABLE* table, cal_impl_if::cal_
buf += ci.columnTypes[colpos].colWidth;
break;
}
case CalpontSystemCatalog::BLOB:
{
int dataLength = 0;
uintptr_t *dataptr;
uchar *ucharptr;
if (ci.columnTypes[colpos].colWidth < 256)
{
dataLength = *(int8_t*) buf;
buf++;
}
else if (ci.columnTypes[colpos].colWidth < 65535)
{
dataLength = *(int16_t*) buf;
buf = buf + 2 ;
}
else if (ci.columnTypes[colpos].colWidth < 16777216)
{
dataLength = *(int32_t*) buf;
buf = buf + 3 ;
}
else
{
dataLength = *(int32_t*) buf;
buf = buf + 4 ;
}
// buf contains pointer to blob, for example:
// (gdb) p (char*)*(uintptr_t*)buf
// $43 = 0x7f68500c58f8 "hello world"
dataptr = (uintptr_t*)buf;
ucharptr = (uchar*)*dataptr;
buf+= sizeof(uintptr_t);
for (int32_t i=0; i<dataLength; i++)
{
fprintf(ci.filePtr, "%02x", *(uint8_t*)ucharptr);
ucharptr++;
}
fprintf(ci.filePtr, "%c", ci.delimiter);
break;
}
default: // treat as int64
{
break;

View File

@@ -2233,10 +2233,10 @@ CalpontSystemCatalog::ColType colType_MysqlToIDB (const Item* item)
ct.colDataType = CalpontSystemCatalog::DATETIME;
ct.colWidth = 8;
}
if (item->field_type() == MYSQL_TYPE_BLOB)
{
throw runtime_error ("BLOB/TEXT data types are not supported by ColumnStore.");
}
if (item->field_type() == MYSQL_TYPE_BLOB)
{
ct.colDataType = CalpontSystemCatalog::BLOB;
}
}
break;
/* FIXME:

View File

@@ -467,6 +467,7 @@ int fetchNextRow(uchar *buf, cal_table_info& ti, cal_connection_info* ci)
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
// TODO: use getStringPointer instead of getStringField to stop the string copies
Field_varstring* f2 = (Field_varstring*)*f;
switch (colType.colWidth)
{
@@ -622,6 +623,14 @@ int fetchNextRow(uchar *buf, cal_table_info& ti, cal_connection_info* ci)
storeNumericField(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::BLOB:
{
Field_blob *f2 = (Field_blob*)*f;
f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
}
default: // treat as int64
{
intColVal = row.getUintField<8>(s);

View File

@@ -249,6 +249,7 @@ inline bool isEmptyVal<8>(uint8_t type, const uint8_t* ival)
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
return (*val == joblist::CHAR8EMPTYROW);
case CalpontSystemCatalog::UBIGINT:
return (joblist::UBIGINTEMPTYROW == *val);
@@ -271,6 +272,7 @@ inline bool isEmptyVal<4>(uint8_t type, const uint8_t* ival)
return (joblist::FLOATEMPTYROW == *val);
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
return (joblist::CHAR4EMPTYROW == *val);
@@ -292,6 +294,7 @@ inline bool isEmptyVal<2>(uint8_t type, const uint8_t* ival)
{
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
return (joblist::CHAR2EMPTYROW == *val);
@@ -313,6 +316,7 @@ inline bool isEmptyVal<1>(uint8_t type, const uint8_t* ival)
{
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
return (*val == joblist::CHAR1EMPTYROW);
@@ -343,6 +347,7 @@ inline bool isNullVal<8>(uint8_t type, const uint8_t* ival)
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
//@bug 339 might be a token here
//TODO: what's up with the second const here?
return (*val == joblist::CHAR8NULL || 0xFFFFFFFFFFFFFFFELL == *val);
@@ -367,6 +372,7 @@ inline bool isNullVal<4>(uint8_t type, const uint8_t* ival)
return (joblist::FLOATNULL == *val);
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
return (joblist::CHAR4NULL == *val);
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
@@ -389,6 +395,7 @@ inline bool isNullVal<2>(uint8_t type, const uint8_t* ival)
{
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
return (joblist::CHAR2NULL == *val);
@@ -410,6 +417,7 @@ inline bool isNullVal<1>(uint8_t type, const uint8_t* ival)
{
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME:
return (*val == joblist::CHAR1NULL);
@@ -451,6 +459,7 @@ inline bool isMinMaxValid(const NewColRequestHeader *in) {
case CalpontSystemCatalog::CHAR:
return (in->DataSize<9);
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::BLOB:
return (in->DataSize<8);
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
@@ -505,7 +514,7 @@ inline bool colCompare(int64_t val1, int64_t val2, uint8_t COP, uint8_t rf, int
return colCompare_(dVal1, dVal2, COP);
}
else if ( (type == CalpontSystemCatalog::CHAR || type == CalpontSystemCatalog::VARCHAR) && !isNull )
else if ( (type == CalpontSystemCatalog::CHAR || type == CalpontSystemCatalog::VARCHAR || type == CalpontSystemCatalog::BLOB) && !isNull )
{
if (!regex.used && !rf)
return colCompare_(order_swap(val1), order_swap(val2), COP);
@@ -1180,7 +1189,7 @@ inline void p_Col_ridArray(NewColRequestHeader *in,
if (out->ValidMinMax && !isNull && !isEmpty)
{
if ((in->DataType == CalpontSystemCatalog::CHAR || in->DataType == CalpontSystemCatalog::VARCHAR ) && 1 < W)
if ((in->DataType == CalpontSystemCatalog::CHAR || in->DataType == CalpontSystemCatalog::VARCHAR || in->DataType == CalpontSystemCatalog::BLOB ) && 1 < W)
{
if (colCompare(out->Min, val, COMPARE_GT, false, in->DataType, W, placeholderRegex))
out->Min = val;

View File

@@ -425,7 +425,12 @@ again:
// is larger than the number of signatures in this block. Return a "special" string so that
// the query keeps going, but that can be recognized as an internal error upon inspection.
//@Bug 2534. Change the length check to 8000
if (ret->len < 0 || ret->len > 8001)
// MCOL-267:
// With BLOB support we have had to increase this to 8176
// because a BLOB can take 8176 bytes of a dictionary block
// instead of the fixed 8000 with CHAR/VARCHAR
if (ret->len < 0 || ret->len > 8176)
{
ret->data = reinterpret_cast<const uint8_t*>(signatureNotFound);
ret->len = strlen(reinterpret_cast<const char*>(ret->data));

View File

@@ -866,6 +866,7 @@ const uint64_t ColumnCommand::getEmptyRowValue( const execplan::CalpontSystemCat
case execplan::CalpontSystemCatalog::DATE :
case execplan::CalpontSystemCatalog::DATETIME :
case execplan::CalpontSystemCatalog::VARBINARY :
case execplan::CalpontSystemCatalog::BLOB :
default:
emptyVal = joblist::CHAR1EMPTYROW;
if ( width == (2 + offset) )

View File

@@ -332,7 +332,7 @@ void DictStep::_execute()
i = 0;
while (i < bpp->ridCount) {
l_lbid = ((int64_t) newRidList[i].token) >> 10;
primMsg->LBID = l_lbid;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
/* When this is used as a filter, the strings can be thrown out. JLF currently
@@ -399,7 +399,7 @@ void DictStep::_project()
i = 0;
while (i < bpp->ridCount) {
l_lbid = ((int64_t) newRidList[i].token) >> 10;
primMsg->LBID = l_lbid;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams *) (primMsg->tokens);
@@ -435,7 +435,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
int64_t l_lbid=0;
int64_t o_lbid=0;
OldGetSigParams *pt;
StringPtr tmpStrings[LOGICAL_BLOCK_RIDS];
StringPtr *tmpStrings = new StringPtr[LOGICAL_BLOCK_RIDS];
rowgroup::Row r;
boost::scoped_array<OrderedToken> newRidList;
@@ -456,7 +456,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
//cout << "DS: projectingToRG rids: " << bpp->ridCount << endl;
while (i < bpp->ridCount) {
l_lbid = ((int64_t) newRidList[i].token) >> 10;
primMsg->LBID = l_lbid;
primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL;
primMsg->NVALS = 0;
primMsg->OutputType = OT_DATAVALUE;
pt = (OldGetSigParams *) (primMsg->tokens);
@@ -499,7 +499,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
}
if (((int64_t)primMsg->LBID)<0 && o_lbid>0)
primMsg->LBID = o_lbid;
primMsg->LBID = o_lbid & 0xFFFFFFFFFL;
memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length());
issuePrimitive(false);
@@ -509,7 +509,8 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
// bug 4901 - move this inside the loop and call incrementally
// to save the unnecessary string copy
if (rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) {
if ((rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) &&
(rg.getColTypes()[col] != execplan::CalpontSystemCatalog::BLOB)) {
for (i = curResultCounter; i < tmpResultCounter; i++) {
rg.getRow(newRidList[i].pos, &r);
//cout << "serializing " << tmpStrings[i] << endl;
@@ -517,9 +518,40 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
}
}
else {
for (i = curResultCounter; i < tmpResultCounter; i++) {
uint32_t firstTmpResultCounter = tmpResultCounter;
for (i = curResultCounter; i < firstTmpResultCounter; i++) {
rg.getRow(newRidList[i].pos, &r);
r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col);
// If this is a multi-block blob, get all the blocks
// We do string copy here, should maybe have a RowGroup
// function to append strings or something?
if (((newRidList[i].token >> 46) < 0x3FFFF) &&
((newRidList[i].token >> 46) > 0))
{
StringPtr multi_part[1];
uint16_t old_offset = primMsg->tokens[0].offset;
string *result = new string((char*)tmpStrings[i].ptr, tmpStrings[i].len);
uint64_t origin_lbid = primMsg->LBID;
uint32_t lbid_count = newRidList[i].token >> 46;
primMsg->tokens[0].offset = 1; // first offset of a sig
for (uint32_t j = 1; j <= lbid_count; j++)
{
tmpResultCounter = 0;
primMsg->LBID = origin_lbid + j;
primMsg->NVALS = 1;
primMsg->tokens[0].LBID = origin_lbid + j;
issuePrimitive(false);
projectResult(multi_part);
result->append((char*)multi_part[0].ptr, multi_part[0].len);
}
primMsg->tokens[0].offset = old_offset;
tmpResultCounter = firstTmpResultCounter;
r.setVarBinaryField((unsigned char*)result->c_str(), result->length(), col);
delete result;
}
else
{
r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col);
}
}
}
curResultCounter = tmpResultCounter;
@@ -528,6 +560,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col)
//cout << "_projectToRG() total length = " << totalResultLength << endl;
idbassert(tmpResultCounter == bpp->ridCount);
delete [] tmpStrings;
//cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID
// << " len: " << tmpResultCounter
// << endl;

View File

@@ -94,7 +94,8 @@ Command* FilterCommand::makeFilterCommand(ByteStream& bs, vector<SCommand>& cmds
// char[] is stored as int, but cannot directly compare if length is different
// due to endian issue
if (cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::CHAR ||
cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::VARCHAR)
cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::VARCHAR ||
cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::BLOB)
{
StrFilterCmd* sc = new StrFilterCmd();
sc->setCompareFunc(CC);

View File

@@ -96,6 +96,7 @@ string Func_hex::getStrVal(rowgroup::Row& row,
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
{
const string& arg = parm[0]->data()->getStrVal(row, isNull);
uint64_t hexLen = arg.size() * 2;

View File

@@ -48,7 +48,8 @@ int64_t Func_length::getIntVal(rowgroup::Row& row,
bool& isNull,
CalpontSystemCatalog::ColType&)
{
if (fp[0]->data()->resultType().colDataType == CalpontSystemCatalog::VARBINARY)
if ((fp[0]->data()->resultType().colDataType == CalpontSystemCatalog::VARBINARY) ||
(fp[0]->data()->resultType().colDataType == CalpontSystemCatalog::BLOB))
return fp[0]->data()->getStrVal(row, isNull).length();
return strlen(fp[0]->data()->getStrVal(row, isNull).c_str());

View File

@@ -266,6 +266,8 @@ void FuncExp::evaluate(rowgroup::Row& row, std::vector<execplan::SRCP>& expressi
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
// TODO: might not be right thing for BLOB
case CalpontSystemCatalog::BLOB:
{
const std::string& val = expression[i]->getStrVal(row, isNull);
if (isNull)

View File

@@ -38,6 +38,7 @@
using namespace std;
#include <boost/shared_array.hpp>
#include <boost/shared_ptr.hpp>
using namespace boost;
#include "bytestream.h"
@@ -73,9 +74,9 @@ StringStore::~StringStore()
uint64_t inUse = 0, allocated = 0;
for (i = 0; i < mem.size(); i++) {
MemChunk *tmp = (MemChunk *) mem.back().get();
inUse += tmp->currentSize;
allocated += tmp->capacity;
std::string *tmp = mem.back().get();
inUse += tmp->length();
allocated += tmp->length();
}
if (allocated > 0)
cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse/(float) allocated << endl;
@@ -84,7 +85,6 @@ StringStore::~StringStore()
uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
{
MemChunk *lastMC = NULL;
uint32_t ret = 0;
empty = false; // At least a NULL is being stored.
@@ -102,31 +102,10 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
if (fUseStoreStringMutex)
lk.lock();
if (mem.size() > 0)
lastMC = (MemChunk *) mem.back().get();
shared_ptr<std::string> newString(new std::string((char*)data, len));
mem.push_back(newString);
if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < len)) {
// mem usage debugging
//if (lastMC)
//cout << "Memchunk efficiency = " << lastMC->currentSize << "/" << lastMC->capacity << endl;
shared_array<uint8_t> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
mem.push_back(newOne);
lastMC = (MemChunk *) mem.back().get();
lastMC->currentSize = 0;
lastMC->capacity = CHUNK_SIZE;
memset(lastMC->data, 0, CHUNK_SIZE);
}
ret = ((mem.size()-1) * CHUNK_SIZE) + lastMC->currentSize;
memcpy(&(lastMC->data[lastMC->currentSize]), data, len);
/*
cout << "stored: '" << hex;
for (uint32_t i = 0; i < len ; i++) {
cout << (char) lastMC->data[lastMC->currentSize + i];
}
cout << "' at position " << lastMC->currentSize << " len " << len << dec << endl;
*/
lastMC->currentSize += len;
ret = mem.size();
return ret;
}
@@ -134,16 +113,17 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len)
void StringStore::serialize(ByteStream &bs) const
{
uint32_t i;
MemChunk *mc;
std::string empty_str;
bs << (uint32_t) mem.size();
bs << (uint8_t) empty;
for (i = 0; i < mem.size(); i++) {
mc = (MemChunk *) mem[i].get();
bs << (uint32_t) mc->currentSize;
if (mem[i].get() == NULL)
bs << empty_str;
else
bs << *mem[i].get();
//cout << "serialized " << mc->currentSize << " bytes\n";
bs.append(mc->data, mc->currentSize);
}
}
}
uint32_t StringStore::deserialize(ByteStream &bs)
@@ -151,27 +131,22 @@ uint32_t StringStore::deserialize(ByteStream &bs)
uint32_t i;
uint32_t count;
uint32_t size;
uint8_t *buf;
MemChunk *mc;
std::string buf;
uint8_t tmp8;
uint32_t ret = 0;
//mem.clear();
bs >> count;
mem.resize(count);
mem.reserve(count);
bs >> tmp8;
empty = (bool) tmp8;
ret += 5;
for (i = 0; i < count; i++) {
bs >> size;
//cout << "deserializing " << size << " bytes\n";
buf = bs.buf();
mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
mc = (MemChunk *) mem[i].get();
mc->currentSize = size;
mc->capacity = size;
memcpy(mc->data, buf, size);
bs.advance(size);
bs >> buf;
shared_ptr<std::string> newString(new std::string(buf));
mem.push_back(newString);
//bs.advance(size);
ret += (size + 4);
}
return ret;
@@ -179,7 +154,7 @@ uint32_t StringStore::deserialize(ByteStream &bs)
void StringStore::clear()
{
vector<shared_array<uint8_t> > emptyv;
vector<shared_ptr<std::string> > emptyv;
mem.swap(emptyv);
empty = true;
}
@@ -365,7 +340,8 @@ string Row::toString() const
else
switch (types[i]) {
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR: {
case CalpontSystemCatalog::VARCHAR:
{
const string &tmp = getStringField(i);
os << "(" << getStringLength(i) << ") '" << tmp << "' ";
break;
@@ -381,7 +357,9 @@ string Row::toString() const
case CalpontSystemCatalog::LONGDOUBLE:
os << getLongDoubleField(i) << " ";
break;
case CalpontSystemCatalog::VARBINARY: {
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
{
uint32_t len = getVarBinaryLength(i);
const uint8_t* val = getVarBinaryField(i);
os << "0x" << hex;
@@ -429,7 +407,9 @@ string Row::toCSV() const
case CalpontSystemCatalog::LONGDOUBLE:
os << getLongDoubleField(i);
break;
case CalpontSystemCatalog::VARBINARY: {
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
{
uint32_t len = getVarBinaryLength(i);
const uint8_t* val = getVarBinaryField(i);
os << "0x" << hex;
@@ -532,6 +512,11 @@ void Row::initToNull()
memset(&data[offsets[i]], 0xFF, getColumnWidth(i));
break;
}
case CalpontSystemCatalog::BLOB: {
// TODO: no NULL value for long double yet, this is a nan.
memset(&data[offsets[i]], 0xFF, getColumnWidth(i));
break;
}
default:
ostringstream os;
os << "Row::initToNull(): got bad column type (" << types[i] <<
@@ -603,6 +588,7 @@ bool Row::isNullValue(uint32_t colIndex) const
}
break;
}
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::VARBINARY: {
uint32_t pos = offsets[colIndex];
if (inStringTable(colIndex)) {
@@ -1095,13 +1081,13 @@ void applyMapping(const int *mapping, const Row &in, Row *out)
for (i = 0; i < in.getColumnCount(); i++)
if (mapping[i] != -1)
{
if (UNLIKELY(in.isLongString(i)))
if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB))
out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]);
else if (UNLIKELY(in.isLongString(i)))
out->setStringField(in.getStringPointer(i), in.getStringLength(i), mapping[i]);
//out->setStringField(in.getStringField(i), mapping[i]);
else if (UNLIKELY(in.isShortString(i)))
out->setUintField(in.getUintField(i), mapping[i]);
else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY))
out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]);
else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE))
out->setLongDoubleField(in.getLongDoubleField(i), mapping[i]);
else if (in.isUnsigned(i))

View File

@@ -119,14 +119,8 @@ private:
// This is an overlay b/c the underlying data needs to be any size,
// and alloc'd in one chunk. data can't be a sepatate dynamic chunk.
struct MemChunk
{
uint32_t currentSize;
uint32_t capacity;
uint8_t data[];
};
std::vector<boost::shared_array<uint8_t> > mem;
std::vector<boost::shared_ptr<std::string> > mem;
bool empty;
bool fUseStoreStringMutex; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex;
@@ -797,7 +791,8 @@ inline void Row::copyField(uint32_t destIndex, uint32_t srcIndex) const
inline void Row::copyField(Row &out, uint32_t destIndex, uint32_t srcIndex) const
{
if (UNLIKELY(types[srcIndex] == execplan::CalpontSystemCatalog::VARBINARY))
if (UNLIKELY(types[srcIndex] == execplan::CalpontSystemCatalog::VARBINARY ||
types[srcIndex] == execplan::CalpontSystemCatalog::BLOB))
out.setVarBinaryField(getVarBinaryStringField(srcIndex), destIndex);
else if (UNLIKELY(isLongString(srcIndex)))
out.setStringField(getStringPointer(srcIndex), getStringLength(srcIndex), destIndex);
@@ -1268,7 +1263,8 @@ inline bool RowGroup::isLongString(uint32_t colIndex) const
{
return ((getColumnWidth(colIndex) > 7 && types[colIndex] == execplan::CalpontSystemCatalog::VARCHAR) ||
(getColumnWidth(colIndex) > 8 && types[colIndex] == execplan::CalpontSystemCatalog::CHAR) ||
types[colIndex] == execplan::CalpontSystemCatalog::VARBINARY);
types[colIndex] == execplan::CalpontSystemCatalog::VARBINARY ||
types[colIndex] == execplan::CalpontSystemCatalog::BLOB);
}
inline bool RowGroup::usesStringTable() const
@@ -1421,7 +1417,7 @@ inline void copyRow(const Row &in, Row *out, uint32_t colCount)
}
for (uint32_t i = 0; i < colCount; i++) {
if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY))
if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB))
out->setVarBinaryField(in.getVarBinaryStringField(i), i);
else if (UNLIKELY(in.isLongString(i)))
//out->setStringField(in.getStringField(i), i);
@@ -1445,17 +1441,13 @@ inline std::string StringStore::getString(uint32_t off, uint32_t len) const
if (off == std::numeric_limits<uint32_t>::max())
return joblist::CPNULLSTRMARK;
MemChunk *mc;
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
// what gets returned, it just can't go out of bounds.
if (mem.size() <= chunk)
if ((mem.size() < off) || off == 0)
return joblist::CPNULLSTRMARK;
mc = (MemChunk *) mem[chunk].get();
if ((offset + len) > mc->currentSize)
return joblist::CPNULLSTRMARK;
return std::string((char *) &(mc->data[offset]), len);
if (mem[off-1].get() == NULL)
return joblist::CPNULLSTRMARK;
return *mem[off-1].get();
}
inline const uint8_t * StringStore::getPointer(uint32_t off) const
@@ -1463,17 +1455,15 @@ inline const uint8_t * StringStore::getPointer(uint32_t off) const
if (off == std::numeric_limits<uint32_t>::max())
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
MemChunk *mc;
// this has to handle uninitialized data as well. If it's uninitialized it doesn't matter
// what gets returned, it just can't go out of bounds.
if (UNLIKELY(mem.size() <= chunk))
if (UNLIKELY(mem.size() < off))
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
mc = (MemChunk *) mem[chunk].get();
if (offset > mc->currentSize)
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
return &(mc->data[offset]);
if (off == 0 || (mem[off-1].get() == NULL))
return (const uint8_t *) joblist::CPNULLSTRMARK.c_str();
return (uint8_t*)mem[off-1].get()->c_str();
}
inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
@@ -1484,17 +1474,15 @@ inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const
if (len < 8)
return false;
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
MemChunk *mc;
if (mem.size() <= chunk)
if ((mem.size() < off) || off == 0)
return true;
mc = (MemChunk *) mem[chunk].get();
if ((offset + len) > mc->currentSize)
return true;
if (mc->data[offset] == 0) // "" = NULL string for some reason...
return true;
return (*((uint64_t *) &mc->data[offset]) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str()));
if (mem[off-1].get() == NULL)
return true;
if (mem[off-1].get()->empty()) // Empty string is NULL
return true;
return (mem[off-1].get()->compare(joblist::CPNULLSTRMARK) == 0);
}
inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t len) const
@@ -1502,15 +1490,13 @@ inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t l
if (off == std::numeric_limits<uint32_t>::max() || len == 0)
return str == joblist::CPNULLSTRMARK;
uint32_t chunk = off / CHUNK_SIZE;
uint32_t offset = off % CHUNK_SIZE;
if (mem.size() <= chunk)
return false;
MemChunk *mc = (MemChunk *) mem[chunk].get();
if ((offset + len) > mc->currentSize)
if ((mem.size() < off) || off == 0)
return false;
return (strncmp(str.c_str(), (const char *) &mc->data[offset], len) == 0);
if (mem[off-1].get() == NULL)
return false;
return (mem[off-1].get()->compare(str) == 0);
}
inline bool StringStore::isEmpty() const
@@ -1522,11 +1508,9 @@ inline uint64_t StringStore::getSize() const
{
uint32_t i;
uint64_t ret = 0;
MemChunk *mc;
for (i = 0; i < mem.size(); i++) {
mc = (MemChunk *) mem[i].get();
ret += mc->capacity;
ret+= mem[i].get()->length();
}
return ret;
}

View File

@@ -1792,7 +1792,7 @@ int ColumnInfo::updateDctnryStore(char* buf,
// column.
// This only applies to default text mode. This step is bypassed for
// binary imports, because in that case, the data is already true binary.
if ((curCol.colType == WR_VARBINARY) &&
if (((curCol.colType == WR_VARBINARY) || (curCol.colType == WR_BLOB)) &&
(fpTableInfo->getImportDataMode() == IMPORT_DATA_TEXT))
{
#ifdef PROFILE

View File

@@ -64,7 +64,7 @@ namespace
const int START_HDR1 = // start loc of 2nd offset (HDR1)
HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
const int PSEUDO_COL_WIDTH = DICT_COL_WIDTH; // used to convert row count to block count
const int MAX_BLOB_SIZE = 2100000000; // for safety, we use an 18bit block count of 8KB blocks
}
namespace WriteEngine
@@ -620,17 +620,105 @@ bool Dctnry::getTokenFromArray(Signature& sig)
* token - token that was assigned to the inserted signature
*
* RETURN:
* none
* success - successfully write the signature to the block
* failure - failed to extend/create an extent for the block
******************************************************************************/
void Dctnry::insertDctnry2(Signature& sig)
int Dctnry::insertDctnry2(Signature& sig)
{
insertDctnryHdr(m_curBlock.data,
sig.size);
insertSgnture(m_curBlock.data, sig.size, (unsigned char*)sig.signature);
int rc = 0;
int write_size;
bool lbid_in_token = false;
sig.token.fbo = m_curLbid;
sig.token.op = m_curOp;
sig.token.spare = 0U;
sig.token.bc = 0;
while (sig.size > 0)
{
if (sig.size > (m_freeSpace - m_totalHdrBytes))
{
write_size = (m_freeSpace - m_totalHdrBytes);
}
else
{
write_size = sig.size;
}
insertDctnryHdr(m_curBlock.data, write_size);
insertSgnture(m_curBlock.data, write_size, (unsigned char*)sig.signature);
sig.size -= write_size;
sig.signature += write_size;
m_curFbo = m_lastFbo;
if (!lbid_in_token)
{
sig.token.fbo = m_curLbid;
sig.token.op = m_curOp;
lbid_in_token = true;
}
if (sig.size > 0)
{
CommBlock cb;
cb.file.oid = m_dctnryOID;
cb.file.pFile = m_dFile;
sig.token.bc++;
RETURN_ON_ERROR( writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo) );
memset( m_curBlock.data, 0, sizeof(m_curBlock.data));
memcpy( m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
m_curBlock.state = BLK_WRITE;
m_curOp =0;
m_lastFbo++;
m_curFbo = m_lastFbo;
//...Expand current extent if it is an abbreviated initial extent
if ((m_curFbo == m_numBlocks) &&
(m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
{
RETURN_ON_ERROR( expandDctnryExtent() );
}
//...Allocate a new extent if we have reached the last block in the
// current extent.
if (m_curFbo == m_numBlocks)
{//last block
//for roll back the extent to use
//Save those empty extents in case of failure to rollback
std::vector<ExtentInfo> dictExtentInfo;
ExtentInfo info;
info.oid = m_dctnryOID;
info.partitionNum = m_partition;
info.segmentNum = m_segment;
info.dbRoot = m_dbRoot;
info.hwm = m_hwm;
info.newFile = false;
dictExtentInfo.push_back (info);
LBID_t startLbid;
// Add an extent.
rc = createDctnry(m_dctnryOID,
0, // dummy column width
m_dbRoot,
m_partition,
m_segment,
startLbid,
false) ;
if ( rc != NO_ERROR )
{
//roll back the extent
BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(
dictExtentInfo);
return rc;
}
}
RETURN_ON_ERROR( BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID,
m_partition, m_segment,
m_curFbo, m_curLbid) );
m_curBlock.lbid = m_curLbid;
}
}
return NO_ERROR;
}
/*******************************************************************************
@@ -706,7 +794,7 @@ int Dctnry::insertDctnry(const char* buf,
// still check against max size & set to null token if needed.
if ((curSig.size == 0) ||
(curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) ||
(curSig.size > MAX_SIGNATURE_SIZE))
(curSig.size > MAX_BLOB_SIZE))
{
if (m_defVal.length() > 0) // use default string if available
{
@@ -736,7 +824,9 @@ int Dctnry::insertDctnry(const char* buf,
}
//...Search for the string in our string cache
if (m_arraySize < MAX_STRING_CACHE_SIZE)
//if it fits into one block (< 8KB)
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
(curSig.size <= MAX_SIGNATURE_SIZE))
{
//Stats::startParseEvent("getTokenFromArray");
found = getTokenFromArray(curSig);
@@ -750,14 +840,15 @@ int Dctnry::insertDctnry(const char* buf,
}
//Stats::stopParseEvent("getTokenFromArray");
}
totalUseSize = HDR_UNIT_SIZE + curSig.size;
totalUseSize = m_totalHdrBytes + curSig.size;
//...String not found in cache, so proceed.
// If room is available in current block then insert into block.
// @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
if( (totalUseSize <= m_freeSpace) &&
if( ((totalUseSize <= m_freeSpace) ||
((curSig.size > 8176) && (m_freeSpace > m_totalHdrBytes))) &&
(m_curOp < (MAX_OP_COUNT-1)) ) {
insertDctnry2(curSig); //m_freeSpace updated!
RETURN_ON_ERROR(insertDctnry2(curSig)); //m_freeSpace updated!
m_curBlock.state = BLK_WRITE;
memcpy( pOut + outOffset, &curSig.token, 8 );
outOffset += 8;
@@ -778,7 +869,9 @@ int Dctnry::insertDctnry(const char* buf,
}
//...Add string to cache, if we have not exceeded cache limit
if (m_arraySize < MAX_STRING_CACHE_SIZE)
// Don't cache big blobs
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
(curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache( curSig );
}
@@ -878,14 +971,15 @@ int Dctnry::insertDctnry(const char* buf,
// we need to add the string to the new block.
if (!found)
{
insertDctnry2(curSig); //m_freeSpace updated!
RETURN_ON_ERROR(insertDctnry2(curSig)); //m_freeSpace updated!
m_curBlock.state = BLK_WRITE;
memcpy( pOut + outOffset, &curSig.token, 8 );
outOffset += 8;
startPos++;
//...Add string to cache, if we have not exceeded cache limit
if (m_arraySize < MAX_STRING_CACHE_SIZE)
if ((m_arraySize < MAX_STRING_CACHE_SIZE) &&
(curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache( curSig );
}
@@ -943,9 +1037,12 @@ int Dctnry::insertDctnry(const int& sgnature_size,
int i;
unsigned char* value = NULL;
int size;
if (sgnature_size > MAX_SIGNATURE_SIZE)
int write_size;
bool lbid_in_token = false;
// Round down for safety. In theory we can take 262143 * 8176 bytes
if (sgnature_size > MAX_BLOB_SIZE)
{
return ERR_DICT_SIZE_GT_8000;
return ERR_DICT_SIZE_GT_2G;
}
if (sgnature_size == 0)
{
@@ -960,23 +1057,41 @@ int Dctnry::insertDctnry(const int& sgnature_size,
size = sgnature_size;
value = (unsigned char*)sgnature_value;
token.bc = 0;
for (i = m_lastFbo; i < m_numBlocks; i++)
{
// @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
if( (m_freeSpace>= (size + HDR_UNIT_SIZE)) &&
if( ((m_freeSpace>= (size + m_totalHdrBytes)) ||
((size > 8176) && (m_freeSpace > m_totalHdrBytes))) &&
(m_curOp < (MAX_OP_COUNT-1)) )
{ // found the perfect block; signature size fit in this block
insertDctnryHdr(m_curBlock.data, size);
insertSgnture(m_curBlock.data, size, value);
if (size > (m_freeSpace - m_totalHdrBytes))
{
write_size = (m_freeSpace - m_totalHdrBytes);
}
else
{
write_size = size;
}
insertDctnryHdr(m_curBlock.data, write_size);
insertSgnture(m_curBlock.data, write_size, value);
size -= write_size;
value += write_size;
m_curBlock.state = BLK_WRITE;
token.fbo = m_curLbid;
token.op = m_curOp;
token.spare = 0;
// We only want the start LBID for a multi-block dict in the token
if (!lbid_in_token)
{
token.fbo = m_curLbid;
token.op = m_curOp;
lbid_in_token = true;
}
if (size > 0)
token.bc++;
m_lastFbo = i;
m_curFbo = m_lastFbo;
if (m_curOp < (MAX_OP_COUNT-1))
if ((m_curOp < (MAX_OP_COUNT-1)) && (size <= 0))
return NO_ERROR;
}//end Found

View File

@@ -239,7 +239,7 @@ protected:
// insertDctnryHdr inserts the new value info into the header.
// insertSgnture inserts the new value into the block.
//
void insertDctnry2(Signature& sig);
int insertDctnry2(Signature& sig);
void insertDctnryHdr( unsigned char* blockBuf, const int& size);
void insertSgnture(unsigned char* blockBuf,
const int& size, unsigned char*value);

View File

@@ -450,7 +450,8 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string &err
bool hasDict = false;
if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) )
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) )
{
hasDict = true;
dictOID.compressionType = colDefPtr->fType->fCompressiontype;
@@ -459,17 +460,20 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string &err
dictcol++;
//@Bug 2534. Take away the limit of 255 and set the limit to 8000.
if (colDefPtr->fType->fLength > 8000)
if ((colDefPtr->fType->fLength > 8000) &&
(dataType != CalpontSystemCatalog::BLOB))
{
ostringstream os;
os << "char, varchar and varbinary length may not exceed 8000";
throw std::runtime_error(os.str());
}
}
else if (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength <= 7)
else if ((dataType == CalpontSystemCatalog::VARBINARY
|| dataType == CalpontSystemCatalog::BLOB)
&& colDefPtr->fType->fLength <= 7)
{
ostringstream os;
os << "varbinary length may not be less than 8";
os << "varbinary and blob length may not be less than 8";
throw std::runtime_error(os.str());
}
@@ -514,7 +518,8 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string &err
//@Bug 2089 Disallow zero length char and varch column to be created
if (dataType == CalpontSystemCatalog::CHAR ||
dataType == CalpontSystemCatalog::VARCHAR ||
dataType == CalpontSystemCatalog::VARBINARY)
dataType == CalpontSystemCatalog::VARBINARY ||
dataType == CalpontSystemCatalog::BLOB)
{
if (colDefPtr->fType->fLength <= 0)
{
@@ -829,17 +834,20 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string & err)
dictOID.dictOID = dictoid;
//@Bug 2534. Take away the limit of 255 and set the limit to 8000.
if (colDefPtr->fType->fLength > 8000)
if ((colDefPtr->fType->fLength > 8000) &&
(dataType != CalpontSystemCatalog::BLOB))
{
ostringstream os;
os << "char, varchar and varbinary length may not exceed 8000";
throw std::runtime_error(os.str());
}
}
else if (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength <= 7)
else if ((dataType == CalpontSystemCatalog::VARBINARY
|| dataType == CalpontSystemCatalog::BLOB)
&& colDefPtr->fType->fLength <= 7)
{
ostringstream os;
os << "varbinary length may not be less than 8";
os << "varbinary and blob length may not be less than 8";
throw std::runtime_error(os.str());
}
@@ -885,7 +893,8 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string & err)
//@Bug 2089 Disallow zero length char and varch column to be created
if (dataType == CalpontSystemCatalog::CHAR ||
dataType == CalpontSystemCatalog::VARCHAR ||
dataType == CalpontSystemCatalog::VARBINARY)
dataType == CalpontSystemCatalog::VARBINARY ||
dataType == CalpontSystemCatalog::BLOB)
{
if (colDefPtr->fType->fLength <= 0)
{
@@ -2242,6 +2251,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::VARBINARY
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::BLOB
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::DECIMAL
&& column.colType.precision > 18)
|| (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL
@@ -2293,7 +2304,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string
//It's the same string for each column, so we just need one dictionary struct
memset(&dictTuple, 0, sizeof(dictTuple));
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
@@ -2628,7 +2639,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string &
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
if (idbdatafile::IDBPolicy::useHdfs())
@@ -2844,7 +2855,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
//int error = NO_ERROR;
//if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
@@ -2998,6 +3009,8 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::VARBINARY
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::BLOB
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::DECIMAL
&& column.colType.precision > 18)
|| (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL
@@ -3015,7 +3028,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
//Tokenize the data value
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
/*
@@ -3066,7 +3079,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string
//It's the same string for each column, so we just need one dictionary struct
memset(&dictTuple, 0, sizeof(dictTuple));
memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length());
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
@@ -3878,6 +3891,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::VARBINARY
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::BLOB
&& column.colType.colWidth > 7)
|| (column.colType.colDataType == CalpontSystemCatalog::DECIMAL
&& column.colType.precision > 18)
|| (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL
@@ -3906,7 +3921,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs
else
{
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
@@ -3952,7 +3967,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs
WriteEngine::DctnryTuple dctnryTuple;
if(defaultvalue.length() > 0)
{
memcpy(dctnryTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}
@@ -4146,6 +4161,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
&& column1.colType.colWidth > 7)
|| (column1.colType.colDataType == CalpontSystemCatalog::VARBINARY
&& column1.colType.colWidth > 7)
|| (column1.colType.colDataType == CalpontSystemCatalog::BLOB
&& column1.colType.colWidth > 7)
|| (column1.colType.colDataType == CalpontSystemCatalog::DECIMAL
&& column1.colType.precision > 18)
|| (column1.colType.colDataType == CalpontSystemCatalog::UDECIMAL
@@ -4170,7 +4187,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
dictStruct.dctnryOid = column1.colType.ddn.dictOID;
dictStruct.columnOid = column1.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, colNewName.c_str(), colNewName.length());
dictTuple.sigValue = (unsigned char*)colNewName.c_str();
dictTuple.sigSize = colNewName.length();
dictTuple.isNull = false;
int error = NO_ERROR;
@@ -4221,7 +4238,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
colValuesList.push_back(aColList1);
WriteEngine::DctnryTuple dctnryTuple;
boost::to_lower(colNewName);
memcpy(dctnryTuple.sigValue, colNewName.c_str(), colNewName.length());
dctnryTuple.sigValue = (unsigned char*)colNewName.c_str();
dctnryTuple.sigSize = colNewName.length();
dctnryTuple.isNull = false;
dctColList = dctnryTuple;
@@ -4336,6 +4353,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
&& column5.colType.colWidth > 7)
|| (column5.colType.colDataType == CalpontSystemCatalog::VARBINARY
&& column5.colType.colWidth > 7)
|| (column5.colType.colDataType == CalpontSystemCatalog::BLOB
&& column5.colType.colWidth > 7)
|| (column5.colType.colDataType == CalpontSystemCatalog::DECIMAL
&& column5.colType.precision > 18)
|| (column5.colType.colDataType == CalpontSystemCatalog::UDECIMAL
@@ -4369,7 +4388,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
else
{
WriteEngine::DctnryTuple dictTuple;
memcpy(dictTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
@@ -4418,7 +4437,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream&
if(defaultvalue.length() > 0)
{
memcpy(dctnryTuple.sigValue, defaultvalue.c_str(), defaultvalue.length());
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}

View File

@@ -316,6 +316,7 @@ inline boost::any getNullValueForType(const execplan::CalpontSystemCatalog::ColT
}
break;
case execplan::CalpontSystemCatalog::BLOB:
case execplan::CalpontSystemCatalog::VARBINARY:
{
std::string charnull;

View File

@@ -2027,7 +2027,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
@@ -2071,6 +2071,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
{
value = row.getVarBinaryStringField(fetchColPos);
break;
@@ -2203,7 +2204,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
@@ -2253,7 +2254,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
@@ -2293,7 +2294,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, value.c_str(), value.length());
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
@@ -2356,6 +2357,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
{
value = row.getVarBinaryStringField(fetchColPos);
break;

View File

@@ -110,7 +110,8 @@ class WE_DMLCommandProc
|| ((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7))
|| ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 18))
|| ((colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL) && (colType.precision > 18))
|| (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY))
|| (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY)
|| (colType.colDataType == execplan::CalpontSystemCatalog::BLOB))
{
return true;
}

View File

@@ -1587,6 +1587,9 @@ int ChunkManager::calculateHeaderSize(int width)
int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows();
int rowsPerFile = rowsPerExtent * extentsPerFile;
int stringsPerBlock = 8180 / (width + 2); // 8180 = 8192 - 12
// BLOB is 1 string per block
if (stringsPerBlock == 0)
stringsPerBlock = 1;
int blocksNeeded = rowsPerFile / stringsPerBlock;
int blocksPerChunk = UNCOMPRESSED_CHUNK_SIZE / BYTE_PER_BLOCK;
lldiv_t chunks = lldiv(blocksNeeded, blocksPerChunk);

View File

@@ -135,7 +135,7 @@ WErrorCodes::WErrorCodes() : fErrorCodes()
// Dictionary error
fErrorCodes[ERR_DICT_NO_SPACE_INSERT] = " no space for a dictionary insert";
fErrorCodes[ERR_DICT_SIZE_GT_8000] = " the dictionary size was >8000";
fErrorCodes[ERR_DICT_SIZE_GT_2G] = " the dictionary size was > 2GB";
fErrorCodes[ERR_DICT_NO_OP_DELETE] = " in the dictionary no op delete";
fErrorCodes[ERR_DICT_NO_OFFSET_DELETE] = " a dictionary bad Delete offset";
fErrorCodes[ERR_DICT_INVALID_HDR] = " a dictionary bad Delete Hdr";

View File

@@ -231,7 +231,7 @@ namespace WriteEngine
// Dictionary error
//--------------------------------------------------------------------------
const int ERR_DICT_NO_SPACE_INSERT= ERR_DCTNRYBASE+ 1; // ins no space
const int ERR_DICT_SIZE_GT_8000 = ERR_DCTNRYBASE+ 2; // ins size >8000
const int ERR_DICT_SIZE_GT_2G = ERR_DCTNRYBASE+ 2; // ins size >8000
const int ERR_DICT_NO_OP_DELETE = ERR_DCTNRYBASE+ 3; // del no op
const int ERR_DICT_NO_OFFSET_DELETE=ERR_DCTNRYBASE+ 4; // del bad offset
const int ERR_DICT_INVALID_HDR = ERR_DCTNRYBASE+ 5; // Delete Hdr

View File

@@ -290,7 +290,7 @@ namespace WriteEngine
struct DctnryTuple /** @brief Dictionary Tuple struct*/
{
unsigned char sigValue[MAX_SIGNATURE_SIZE]; /** @brief dictionary signature value*/
unsigned char *sigValue; /** @brief dictionary signature value*/
int sigSize; /** @brief dictionary signature size */
Token token; /** @brief dictionary token */
bool isNull;

View File

@@ -45,12 +45,12 @@ namespace WriteEngine
struct Token {
uint64_t op : 10; // ordinal position within a block
uint64_t fbo : 36; // file block number
uint64_t spare : 18; // spare
uint64_t bc : 18; // block count
Token() // constructor, set to null value
{
op = 0x3FE;
fbo = 0xFFFFFFFFFLL;
spare = 0x3FFFF;
bc = 0x3FFFF;
}
};

View File

@@ -692,7 +692,7 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
dctnryStruct.colWidth = dictColWidth;
dctnryStruct.fCompressionType = column.compressionType;
DctnryTuple dctnryTuple;
memcpy(dctnryTuple.sigValue, defaultValStr.c_str(), defaultValStr.length());
dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str();
dctnryTuple.sigSize = defaultValStr.length();
rc = dctnry->openDctnry(dctnryStruct.dctnryOid,
@@ -761,7 +761,7 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi
dctnryStruct.colWidth = dictColWidth;
dctnryStruct.fCompressionType = column.compressionType;
DctnryTuple dctnryTuple;
memcpy(dctnryTuple.sigValue, defaultValStr.c_str(), defaultValStr.length());
dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str();
//WriteEngineWrapper wrapper;
dctnryTuple.sigSize = defaultValStr.length();
//rc = wrapper.tokenize(txnid, dctnryStruct, dctnryTuple);
@@ -1454,6 +1454,7 @@ int ColumnOp::addExtent(
//pOldVal = &((double *) oldValArray)[i];
break;
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_BLOB :
case WriteEngine::WR_CHAR :
if (!bDelete)
{
@@ -1588,6 +1589,7 @@ int ColumnOp::addExtent(
//pOldVal = &((double *) oldValArray)[i];
break;
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_BLOB :
case WriteEngine::WR_CHAR :
if (!bDelete)
{
@@ -1717,6 +1719,7 @@ int ColumnOp::addExtent(
pVal = &((double *) valArray)[i];
break;
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_BLOB :
case WriteEngine::WR_CHAR :
{
memcpy(charTmpBuf, (char*)valArray+i*8, 8);

View File

@@ -624,7 +624,7 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid,
DctnryTuple dctnryTuple;
DctColTupleList dctColTuples;
memcpy(dctnryTuple.sigValue, tmpStr.c_str(), tmpStr.length());
dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str();
dctnryTuple.sigSize = tmpStr.length();
dctnryTuple.isNull = true;
dctColTuples.push_back (dctnryTuple);
@@ -1200,7 +1200,7 @@ timer.stop("allocRowId");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
@@ -1248,7 +1248,7 @@ timer.stop("tokenize");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
@@ -1710,7 +1710,7 @@ timer.start("allocRowId");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
@@ -1776,7 +1776,7 @@ timer.stop("tokenize");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
@@ -2333,7 +2333,7 @@ timer.stop("allocRowId");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid,
@@ -2403,7 +2403,7 @@ timer.stop("tokenize");
timer.start("tokenize");
#endif
DctnryTuple dctTuple;
memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length());
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
dctTuple.sigSize = dctStr_iter->length();
dctTuple.isNull = false;
rc = tokenize(txnid,