diff --git a/dbcon/ddlpackage/ddl.l b/dbcon/ddlpackage/ddl.l index 64fa09356..ac51fe020 100644 --- a/dbcon/ddlpackage/ddl.l +++ b/dbcon/ddlpackage/ddl.l @@ -156,6 +156,14 @@ EXISTS {return EXISTS;} CHANGE {return CHANGE;} TRUNCATE {return TRUNCATE;} VARBINARY {return VARBINARY;} +TINYBLOB {return TINYBLOB;} +BLOB {return BLOB;} +MEDIUMBLOB {return MEDIUMBLOB;} +LONGBLOB {return LONGBLOB;} +TINYTEXT {return TINYTEXT;} +TEXT {return TEXT;} +MEDIUMTEXT {return MEDIUMTEXT;} +LONGTEXT {return LONGTEXT;} \n { lineno++;} diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index 7516c2b8e..ba3bf9a7c 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -110,12 +110,14 @@ char* copy_string(const char *str); %} -%token ACTION ADD ALTER AUTO_INCREMENT BIGINT BIT IDB_BLOB CASCADE IDB_CHAR CHARACTER CHECK CLOB COLUMN +%token ACTION ADD ALTER AUTO_INCREMENT BIGINT BIT BLOB IDB_BLOB CASCADE IDB_CHAR +CHARACTER CHECK CLOB COLUMN COLUMNS COMMENT CONSTRAINT CONSTRAINTS CREATE CURRENT_USER DATETIME DEC DECIMAL DEFAULT DEFERRABLE DEFERRED IDB_DELETE DROP ENGINE -FOREIGN FULL IMMEDIATE INDEX INITIALLY IDB_INT INTEGER KEY MATCH MAX_ROWS +FOREIGN FULL IMMEDIATE INDEX INITIALLY IDB_INT INTEGER KEY LONGBLOB LONGTEXT +MATCH MAX_ROWS MEDIUMBLOB MEDIUMTEXT MIN_ROWS MODIFY NO NOT NULL_TOK NUMBER NUMERIC ON PARTIAL PRECISION PRIMARY -REFERENCES RENAME RESTRICT SET SMALLINT TABLE TIME +REFERENCES RENAME RESTRICT SET SMALLINT TABLE TEXT TIME TINYBLOB TINYTEXT TINYINT TO UNIQUE UNSIGNED UPDATE USER SESSION_USER SYSTEM_USER VARCHAR VARBINARY VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE @@ -136,6 +138,8 @@ VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE %type ata_rename_table %type character_string_type %type binary_string_type +%type blob_type +%type text_type %type check_constraint_def %type column_constraint %type column_constraint_def @@ -704,6 +708,8 @@ data_type: | binary_string_type | numeric_type | datetime_type + | blob_type + | text_type | IDB_BLOB { $$ = new ColumnType(DDL_BLOB); @@ -865,6 +871,62 @@ binary_string_type: } ; +blob_type: + BLOB '(' ICONST ')' + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = atol($3); + } + | BLOB + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 65535; + } + | TINYBLOB + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 255; + } + | MEDIUMBLOB + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 16777215; + } + | LONGBLOB + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 2100000000; + } + ; + +text_type: + TEXT '(' ICONST ')' + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = atol($3); + } + | TEXT + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 65535; + } + | TINYTEXT + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 255; + } + | MEDIUMTEXT + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 16777215; + } + | LONGTEXT + { + $$ = new ColumnType(DDL_BLOB); + $$->fLength = 2100000000; + } + ; + numeric_type: exact_numeric_type | approximate_numeric_type diff --git a/dbcon/ddlpackage/ddlpkg.h b/dbcon/ddlpackage/ddlpkg.h index 1de5e7c13..585f2c9d5 100644 --- a/dbcon/ddlpackage/ddlpkg.h +++ b/dbcon/ddlpackage/ddlpkg.h @@ -1008,7 +1008,7 @@ struct ColumnType int fType; /** @brief Length of datatype in bytes */ - int fLength; + long fLength; /** @brief SQL precision. This is the number of digits in the representation. */ int fPrecision; diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index df2d77eb8..d89d648b2 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -561,7 +561,8 @@ void AlterTableProcessor::addColumn (uint32_t sessionID, execplan::CalpontSystem if ((columnDefPtr->fType->fType == CalpontSystemCatalog::CHAR && columnDefPtr->fType->fLength > 8) || (columnDefPtr->fType->fType == CalpontSystemCatalog::VARCHAR && columnDefPtr->fType->fLength > 7) || - (columnDefPtr->fType->fType == CalpontSystemCatalog::VARBINARY && columnDefPtr->fType->fLength > 7)) + (columnDefPtr->fType->fType == CalpontSystemCatalog::VARBINARY && columnDefPtr->fType->fLength > 7) || + (columnDefPtr->fType->fType == CalpontSystemCatalog::BLOB)) { isDict = true; } diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index ba1a780a9..8e9e85039 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -244,7 +244,8 @@ keepGoing: dataType = convertDataType(tableDef.fColumns[i]->fType->fType); if ( (dataType == CalpontSystemCatalog::CHAR && tableDef.fColumns[i]->fType->fLength > 8) || (dataType == CalpontSystemCatalog::VARCHAR && tableDef.fColumns[i]->fType->fLength > 7) || - (dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) ) + (dataType == CalpontSystemCatalog::VARBINARY && tableDef.fColumns[i]->fType->fLength > 7) || + (dataType == CalpontSystemCatalog::BLOB && tableDef.fColumns[i]->fType->fLength > 7) ) numDictCols++; } fStartingColOID = fObjectIDManager.allocOIDs(numColumns+numDictCols+1); //include column, oids,dictionary oids and tableoid @@ -533,7 +534,8 @@ cout << fTxnid.id << " Create table WE_SVR_WRITE_CREATE_SYSCOLUMN: " << errorMsg bytestream << (uint32_t) colDefPtr->fType->fCompressiontype; if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) || (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) || - (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ) + (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) || + (dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ) { bytestream << (uint32_t) (fStartingColOID+numColumns+(dictNum++)+1); bytestream << (uint8_t) dataType; diff --git a/dbcon/ddlpackageproc/ddlindexpopulator.cpp b/dbcon/ddlpackageproc/ddlindexpopulator.cpp index e5062e04c..36d1168ad 100644 --- a/dbcon/ddlpackageproc/ddlindexpopulator.cpp +++ b/dbcon/ddlpackageproc/ddlindexpopulator.cpp @@ -282,7 +282,7 @@ namespace ddlpackageprocessor dictStruct.treeOid = colType.ddn.treeOID; dictStruct.listOid = colType.ddn.listOID; dictStruct.dctnryOid = colType.ddn.dictOID; - memcpy(dictTuple.sigValue, data.c_str(), data.length()); + dictTuple.sigValue = data.c_str(); dictTuple.sigSize = data.length(); int error = NO_ERROR; if ( NO_ERROR != (error = fWriteEngine->tokenize( fTxnID, dictStruct, dictTuple)) ) diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp index 65f36c145..d3c3a7dc4 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp @@ -673,7 +673,7 @@ boost::any DDLPackageProcessor::tokenizeData(execplan::CalpontSystemCatalog::SCN //added for multifiles per oid dictStruct.columnOid = colType.columnOID; WriteEngine::DctnryTuple dictTuple; - memcpy(dictTuple.sigValue, str.c_str(), str.length()); + dictTuple.sigValue = (unsigned char*)str.c_str(); dictTuple.sigSize = str.length(); int error = NO_ERROR; if (NO_ERROR != (error = fWriteEngine.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp index d35b0a403..2d1a39512 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp @@ -131,7 +131,7 @@ boost::any DMLPackageProcessor::tokenizeData( execplan::CalpontSystemCatalog::SC dictStruct.dctnryOid = colType.ddn.dictOID; //cout << "Dictionary OIDs: " << colType.ddn.treeOID << " " << colType.ddn.listOID << endl; WriteEngine::DctnryTuple dictTuple; - memcpy(dictTuple.sigValue, data.c_str(), data.length()); + dictTuple.sigValue = data.c_str(); dictTuple.sigSize = data.length(); int error = NO_ERROR; if ( NO_ERROR != (error = fWriteEngine.tokenize( txnID, dictStruct, dictTuple)) ) diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index 2b93c1300..b30285c43 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -443,7 +443,8 @@ protected: if (((colType.colDataType == execplan::CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) || ((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) || ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 18)) - || (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY)) + || (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY) + || (colType.colDataType == execplan::CalpontSystemCatalog::BLOB)) { return true; } diff --git a/dbcon/execplan/calpontsystemcatalog.h b/dbcon/execplan/calpontsystemcatalog.h index 2936b2daa..3bb912de5 100644 --- a/dbcon/execplan/calpontsystemcatalog.h +++ b/dbcon/execplan/calpontsystemcatalog.h @@ -904,7 +904,9 @@ const CalpontSystemCatalog::TableAliasName make_aliasview(const std::string& s, */ inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type) { - return (execplan::CalpontSystemCatalog::VARCHAR == type || execplan::CalpontSystemCatalog::CHAR == type); + return (execplan::CalpontSystemCatalog::VARCHAR == type || + execplan::CalpontSystemCatalog::CHAR == type || + execplan::CalpontSystemCatalog::BLOB == type); } /** convenience function to determine if column type is an diff --git a/dbcon/execplan/simplecolumn.cpp b/dbcon/execplan/simplecolumn.cpp index bfbf96a2c..3c641804c 100644 --- a/dbcon/execplan/simplecolumn.cpp +++ b/dbcon/execplan/simplecolumn.cpp @@ -583,6 +583,7 @@ void SimpleColumn::evaluate(Row& row, bool& isNull) break; } case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: { fResult.strVal = row.getVarBinaryStringField(fInputIndex); break; diff --git a/dbcon/execplan/simplefilter.cpp b/dbcon/execplan/simplefilter.cpp index 692da7513..55b22cdb2 100644 --- a/dbcon/execplan/simplefilter.cpp +++ b/dbcon/execplan/simplefilter.cpp @@ -176,6 +176,7 @@ const string SimpleFilter::data() const if (dynamic_cast(fRhs) && (fRhs->resultType().colDataType == CalpontSystemCatalog::VARCHAR || fRhs->resultType().colDataType == CalpontSystemCatalog::CHAR || + fRhs->resultType().colDataType == CalpontSystemCatalog::BLOB || fRhs->resultType().colDataType == CalpontSystemCatalog::VARBINARY || fRhs->resultType().colDataType == CalpontSystemCatalog::DATE || fRhs->resultType().colDataType == CalpontSystemCatalog::DATETIME)) @@ -185,6 +186,7 @@ const string SimpleFilter::data() const if (dynamic_cast(fLhs) && (fLhs->resultType().colDataType == CalpontSystemCatalog::VARCHAR || fLhs->resultType().colDataType == CalpontSystemCatalog::CHAR || + fLhs->resultType().colDataType == CalpontSystemCatalog::BLOB || fLhs->resultType().colDataType == CalpontSystemCatalog::VARBINARY || fLhs->resultType().colDataType == CalpontSystemCatalog::DATE || fLhs->resultType().colDataType == CalpontSystemCatalog::DATETIME)) diff --git a/dbcon/execplan/treenode.h b/dbcon/execplan/treenode.h index db6302530..8fdcc6076 100644 --- a/dbcon/execplan/treenode.h +++ b/dbcon/execplan/treenode.h @@ -394,6 +394,7 @@ inline bool TreeNode::getBoolVal() return (atoi(fResult.strVal.c_str()) != 0); //FIXME: Huh??? case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: if (fResultType.colWidth <= 7) return (atoi((char*)(&fResult.origIntVal)) != 0); return (atoi(fResult.strVal.c_str()) != 0); @@ -440,6 +441,7 @@ inline const std::string& TreeNode::getStrVal() break; //FIXME: ??? case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: if (fResultType.colWidth <= 7) fResult.strVal = (char*)(&fResult.origIntVal); break; @@ -573,6 +575,7 @@ inline int64_t TreeNode::getIntVal() return atoll(fResult.strVal.c_str()); //FIXME: ??? case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: if (fResultType.colWidth <= 7) return fResult.intVal; return atoll(fResult.strVal.c_str()); @@ -656,6 +659,7 @@ inline float TreeNode::getFloatVal() return atof(fResult.strVal.c_str()); //FIXME: ??? case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: if (fResultType.colWidth <= 7) return atof((char*)(&fResult.origIntVal)); return atof(fResult.strVal.c_str()); @@ -703,6 +707,7 @@ inline double TreeNode::getDoubleVal() return strtod(fResult.strVal.c_str(), NULL); //FIXME: ??? case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: if (fResultType.colWidth <= 7) return strtod((char*)(&fResult.origIntVal), NULL); return strtod(fResult.strVal.c_str(), NULL); @@ -746,6 +751,7 @@ inline IDB_Decimal TreeNode::getDecimalVal() case CalpontSystemCatalog::VARCHAR: throw logging::InvalidConversionExcept("TreeNode::getDecimalVal: non-support conversion from string"); case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: throw logging::InvalidConversionExcept("TreeNode::getDecimalVal: non-support conversion from binary string"); case CalpontSystemCatalog::BIGINT: case CalpontSystemCatalog::MEDINT: diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 9abfcfe11..aeb270ad2 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -231,7 +231,7 @@ void CrossEngineStep::makeMappings() } -void CrossEngineStep::setField(int i, const char* value, Row& row) +void CrossEngineStep::setField(int i, const char* value, unsigned long length, Row& row) { CalpontSystemCatalog::ColDataType colType = row.getColType(i); @@ -243,6 +243,13 @@ void CrossEngineStep::setField(int i, const char* value, Row& row) else row.setStringField("", i); } + else if ((colType == CalpontSystemCatalog::BLOB) || (colType == CalpontSystemCatalog::VARBINARY)) + { + if (value != NULL) + row.setVarBinaryField((const uint8_t*)value, length, i); + else + row.setVarBinaryField(NULL, 0, i); + } else { CalpontSystemCatalog::ColType ct; @@ -484,7 +491,7 @@ void CrossEngineStep::execute() while ((rowIn = mysql->nextRow()) && !cancelled()) { for(int i = 0; i < num_fields; i++) - setField(i, rowIn[i], fRowDelivered); + setField(i, rowIn[i], mysql->getFieldLength(i), fRowDelivered); addRow(rgDataDelivered); } @@ -504,7 +511,7 @@ void CrossEngineStep::execute() for(int i = 0; i < num_fields; i++) { if (fFe1Column[i] != -1) - setField(fFe1Column[i], rowIn[i], rowFe1); + setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), rowFe1); } if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false) @@ -518,7 +525,7 @@ void CrossEngineStep::execute() for(int i = 0; i < num_fields; i++) { if (fFe1Column[i] == -1) - setField(i, rowIn[i], fRowDelivered); + setField(i, rowIn[i], mysql->getFieldLength(i), fRowDelivered); } addRow(rgDataDelivered); @@ -536,7 +543,7 @@ void CrossEngineStep::execute() while ((rowIn = mysql->nextRow()) && !cancelled()) { for(int i = 0; i < num_fields; i++) - setField(i, rowIn[i], rowFe3); + setField(i, rowIn[i], mysql->getFieldLength(i), rowFe3); fFeInstance->evaluate(rowFe3, fFeSelects); fFeInstance->evaluate(rowFe3, fFeSelects); @@ -567,7 +574,7 @@ void CrossEngineStep::execute() for(int i = 0; i < num_fields; i++) { if (fFe1Column[i] != -1) - setField(fFe1Column[i], rowIn[i], rowFe1); + setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), rowFe1); } if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false) @@ -581,7 +588,7 @@ void CrossEngineStep::execute() for(int i = 0; i < num_fields; i++) { if (fFe1Column[i] == -1) - setField(i, rowIn[i], rowFe3); + setField(i, rowIn[i], mysql->getFieldLength(i), rowFe3); } fFeInstance->evaluate(rowFe3, fFeSelects); diff --git a/dbcon/joblist/crossenginestep.h b/dbcon/joblist/crossenginestep.h index bdbba656b..c198efbef 100644 --- a/dbcon/joblist/crossenginestep.h +++ b/dbcon/joblist/crossenginestep.h @@ -60,13 +60,20 @@ public: int getFieldCount() { return mysql_num_fields(fRes); } int getRowCount() { return mysql_num_rows(fRes); } - char** nextRow() { return mysql_fetch_row(fRes); } + char** nextRow() + { + char** row = mysql_fetch_row(fRes); + fieldLengths = mysql_fetch_lengths(fRes); + return row; + } + long getFieldLength(int field) { return fieldLengths[field]; } const std::string& getError() { return fErrStr; } private: MYSQL* fCon; MYSQL_RES* fRes; std::string fErrStr; + unsigned long *fieldLengths; }; /** @brief class CrossEngineStep @@ -150,7 +157,7 @@ protected: virtual void makeMappings(); virtual void addFilterStr(const std::vector&, const std::string&); virtual std::string makeQuery(); - virtual void setField(int, const char*, rowgroup::Row&); + virtual void setField(int, const char*, unsigned long, rowgroup::Row&); inline void addRow(rowgroup::RGData &); //inline void addRow(boost::shared_array&); virtual int64_t convertValueNum( diff --git a/dbcon/joblist/expressionstep.cpp b/dbcon/joblist/expressionstep.cpp index 96616ee0b..6eae7872a 100644 --- a/dbcon/joblist/expressionstep.cpp +++ b/dbcon/joblist/expressionstep.cpp @@ -324,7 +324,8 @@ void ExpressionStep::addColumn(ReturnedColumn* rc, JobInfo& jobInfo) void ExpressionStep::populateColumnInfo(ReturnedColumn* rc, JobInfo& jobInfo) { // As of bug3695, make sure varbinary is not used in function expression. - if (rc->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK) + if ((rc->resultType().colDataType == CalpontSystemCatalog::VARBINARY || + rc->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK) throw runtime_error("VARBINARY in filter or function is not supported."); SimpleColumn* sc = dynamic_cast(rc); @@ -344,7 +345,8 @@ void ExpressionStep::populateColumnInfo(ReturnedColumn* rc, JobInfo& jobInfo) void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo) { // As of bug3695, make sure varbinary is not used in function expression. - if (sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK) + if ((sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY || + sc->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK) throw runtime_error ("VARBINARY in filter or function is not supported."); CalpontSystemCatalog::OID tblOid = joblist::tableOid(sc, jobInfo.csc); @@ -409,7 +411,8 @@ void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo) void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobInfo) { // As of bug3695, make sure varbinary is not used in function expression. - if (wc->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK) + if ((wc->resultType().colDataType == CalpontSystemCatalog::VARBINARY || + wc->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK) throw runtime_error("VARBINARY in filter or function is not supported."); // This is for window function in IN/EXISTS sub-query. @@ -434,7 +437,8 @@ void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobIn void ExpressionStep::populateColumnInfo(AggregateColumn* ac, JobInfo& jobInfo) { // As of bug3695, make sure varbinary is not used in function expression. - if (ac->resultType().colDataType == CalpontSystemCatalog::VARBINARY && !fVarBinOK) + if ((ac->resultType().colDataType == CalpontSystemCatalog::VARBINARY || + ac->resultType().colDataType == CalpontSystemCatalog::BLOB) && !fVarBinOK) throw runtime_error("VARBINARY in filter or function is not supported."); // This is for aggregate function in IN/EXISTS sub-query. diff --git a/dbcon/joblist/pcolscan.cpp b/dbcon/joblist/pcolscan.cpp index 7a5927b75..455ed97ba 100644 --- a/dbcon/joblist/pcolscan.cpp +++ b/dbcon/joblist/pcolscan.cpp @@ -171,7 +171,8 @@ pColScanStep::pColScanStep( } //If this is a dictionary column, fudge the numbers... - if (fColType.colDataType == CalpontSystemCatalog::VARBINARY) + if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY) + || (fColType.colDataType == CalpontSystemCatalog::BLOB)) { fColType.colWidth = 8; fIsDict = true; diff --git a/dbcon/joblist/pcolstep.cpp b/dbcon/joblist/pcolstep.cpp index 93680cd33..7bb46a182 100644 --- a/dbcon/joblist/pcolstep.cpp +++ b/dbcon/joblist/pcolstep.cpp @@ -169,7 +169,8 @@ pColStep::pColStep( } //If this is a dictionary column, fudge the numbers... - if (fColType.colDataType == CalpontSystemCatalog::VARBINARY) + if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY) + || (fColType.colDataType == CalpontSystemCatalog::BLOB)) { fColType.colWidth = 8; fIsDict = true; diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index e92a76e09..d95a6b053 100755 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -1585,6 +1585,49 @@ int ha_calpont_impl_write_batch_row_(uchar *buf, TABLE* table, cal_impl_if::cal_ buf += ci.columnTypes[colpos].colWidth; break; } + case CalpontSystemCatalog::BLOB: + { + int dataLength = 0; + uintptr_t *dataptr; + uchar *ucharptr; + + if (ci.columnTypes[colpos].colWidth < 256) + { + dataLength = *(int8_t*) buf; + buf++; + } + else if (ci.columnTypes[colpos].colWidth < 65535) + { + dataLength = *(int16_t*) buf; + buf = buf + 2 ; + } + else if (ci.columnTypes[colpos].colWidth < 16777216) + { + dataLength = *(int32_t*) buf; + buf = buf + 3 ; + } + else + { + dataLength = *(int32_t*) buf; + buf = buf + 4 ; + } + + // buf contains pointer to blob, for example: + // (gdb) p (char*)*(uintptr_t*)buf + // $43 = 0x7f68500c58f8 "hello world" + + dataptr = (uintptr_t*)buf; + ucharptr = (uchar*)*dataptr; + buf+= sizeof(uintptr_t); + for (int32_t i=0; ifield_type() == MYSQL_TYPE_BLOB) - { - throw runtime_error ("BLOB/TEXT data types are not supported by ColumnStore."); - } + if (item->field_type() == MYSQL_TYPE_BLOB) + { + ct.colDataType = CalpontSystemCatalog::BLOB; + } } break; /* FIXME: diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index ecfc3e83d..6715f0a19 100755 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -467,6 +467,7 @@ int fetchNextRow(uchar *buf, cal_table_info& ti, cal_connection_info* ci) case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: { + // TODO: use getStringPointer instead of getStringField to stop the string copies Field_varstring* f2 = (Field_varstring*)*f; switch (colType.colWidth) { @@ -622,6 +623,14 @@ int fetchNextRow(uchar *buf, cal_table_info& ti, cal_connection_info* ci) storeNumericField(f, intColVal, colType); break; } + case CalpontSystemCatalog::BLOB: + { + Field_blob *f2 = (Field_blob*)*f; + f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s)); + if ((*f)->null_ptr) + *(*f)->null_ptr &= ~(*f)->null_bit; + break; + } default: // treat as int64 { intColVal = row.getUintField<8>(s); diff --git a/primitives/linux-port/column.cpp b/primitives/linux-port/column.cpp index 7474239fd..7c57d15e9 100644 --- a/primitives/linux-port/column.cpp +++ b/primitives/linux-port/column.cpp @@ -249,6 +249,7 @@ inline bool isEmptyVal<8>(uint8_t type, const uint8_t* ival) case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: return (*val == joblist::CHAR8EMPTYROW); case CalpontSystemCatalog::UBIGINT: return (joblist::UBIGINTEMPTYROW == *val); @@ -271,6 +272,7 @@ inline bool isEmptyVal<4>(uint8_t type, const uint8_t* ival) return (joblist::FLOATEMPTYROW == *val); case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: return (joblist::CHAR4EMPTYROW == *val); @@ -292,6 +294,7 @@ inline bool isEmptyVal<2>(uint8_t type, const uint8_t* ival) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: return (joblist::CHAR2EMPTYROW == *val); @@ -313,6 +316,7 @@ inline bool isEmptyVal<1>(uint8_t type, const uint8_t* ival) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: return (*val == joblist::CHAR1EMPTYROW); @@ -343,6 +347,7 @@ inline bool isNullVal<8>(uint8_t type, const uint8_t* ival) case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: //@bug 339 might be a token here //TODO: what's up with the second const here? return (*val == joblist::CHAR8NULL || 0xFFFFFFFFFFFFFFFELL == *val); @@ -367,6 +372,7 @@ inline bool isNullVal<4>(uint8_t type, const uint8_t* ival) return (joblist::FLOATNULL == *val); case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: return (joblist::CHAR4NULL == *val); case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: @@ -389,6 +395,7 @@ inline bool isNullVal<2>(uint8_t type, const uint8_t* ival) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: return (joblist::CHAR2NULL == *val); @@ -410,6 +417,7 @@ inline bool isNullVal<1>(uint8_t type, const uint8_t* ival) { case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATETIME: return (*val == joblist::CHAR1NULL); @@ -451,6 +459,7 @@ inline bool isMinMaxValid(const NewColRequestHeader *in) { case CalpontSystemCatalog::CHAR: return (in->DataSize<9); case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::BLOB: return (in->DataSize<8); case CalpontSystemCatalog::TINYINT: case CalpontSystemCatalog::SMALLINT: @@ -505,7 +514,7 @@ inline bool colCompare(int64_t val1, int64_t val2, uint8_t COP, uint8_t rf, int return colCompare_(dVal1, dVal2, COP); } - else if ( (type == CalpontSystemCatalog::CHAR || type == CalpontSystemCatalog::VARCHAR) && !isNull ) + else if ( (type == CalpontSystemCatalog::CHAR || type == CalpontSystemCatalog::VARCHAR || type == CalpontSystemCatalog::BLOB) && !isNull ) { if (!regex.used && !rf) return colCompare_(order_swap(val1), order_swap(val2), COP); @@ -1180,7 +1189,7 @@ inline void p_Col_ridArray(NewColRequestHeader *in, if (out->ValidMinMax && !isNull && !isEmpty) { - if ((in->DataType == CalpontSystemCatalog::CHAR || in->DataType == CalpontSystemCatalog::VARCHAR ) && 1 < W) + if ((in->DataType == CalpontSystemCatalog::CHAR || in->DataType == CalpontSystemCatalog::VARCHAR || in->DataType == CalpontSystemCatalog::BLOB ) && 1 < W) { if (colCompare(out->Min, val, COMPARE_GT, false, in->DataType, W, placeholderRegex)) out->Min = val; diff --git a/primitives/linux-port/dictionary.cpp b/primitives/linux-port/dictionary.cpp index 3e00634af..0cc56df34 100644 --- a/primitives/linux-port/dictionary.cpp +++ b/primitives/linux-port/dictionary.cpp @@ -425,7 +425,12 @@ again: // is larger than the number of signatures in this block. Return a "special" string so that // the query keeps going, but that can be recognized as an internal error upon inspection. //@Bug 2534. Change the length check to 8000 - if (ret->len < 0 || ret->len > 8001) + + // MCOL-267: + // With BLOB support we have had to increase this to 8176 + // because a BLOB can take 8176 bytes of a dictionary block + // instead of the fixed 8000 with CHAR/VARCHAR + if (ret->len < 0 || ret->len > 8176) { ret->data = reinterpret_cast(signatureNotFound); ret->len = strlen(reinterpret_cast(ret->data)); diff --git a/primitives/primproc/columncommand.cpp b/primitives/primproc/columncommand.cpp index 8cad317b3..891f33373 100644 --- a/primitives/primproc/columncommand.cpp +++ b/primitives/primproc/columncommand.cpp @@ -866,6 +866,7 @@ const uint64_t ColumnCommand::getEmptyRowValue( const execplan::CalpontSystemCat case execplan::CalpontSystemCatalog::DATE : case execplan::CalpontSystemCatalog::DATETIME : case execplan::CalpontSystemCatalog::VARBINARY : + case execplan::CalpontSystemCatalog::BLOB : default: emptyVal = joblist::CHAR1EMPTYROW; if ( width == (2 + offset) ) diff --git a/primitives/primproc/dictstep.cpp b/primitives/primproc/dictstep.cpp index 029fa25df..b0fcbc007 100644 --- a/primitives/primproc/dictstep.cpp +++ b/primitives/primproc/dictstep.cpp @@ -332,7 +332,7 @@ void DictStep::_execute() i = 0; while (i < bpp->ridCount) { l_lbid = ((int64_t) newRidList[i].token) >> 10; - primMsg->LBID = l_lbid; + primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL; primMsg->NVALS = 0; /* When this is used as a filter, the strings can be thrown out. JLF currently @@ -399,7 +399,7 @@ void DictStep::_project() i = 0; while (i < bpp->ridCount) { l_lbid = ((int64_t) newRidList[i].token) >> 10; - primMsg->LBID = l_lbid; + primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL; primMsg->NVALS = 0; primMsg->OutputType = OT_DATAVALUE; pt = (OldGetSigParams *) (primMsg->tokens); @@ -435,7 +435,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col) int64_t l_lbid=0; int64_t o_lbid=0; OldGetSigParams *pt; - StringPtr tmpStrings[LOGICAL_BLOCK_RIDS]; + StringPtr *tmpStrings = new StringPtr[LOGICAL_BLOCK_RIDS]; rowgroup::Row r; boost::scoped_array newRidList; @@ -456,7 +456,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col) //cout << "DS: projectingToRG rids: " << bpp->ridCount << endl; while (i < bpp->ridCount) { l_lbid = ((int64_t) newRidList[i].token) >> 10; - primMsg->LBID = l_lbid; + primMsg->LBID = (l_lbid == -1) ? l_lbid : l_lbid & 0xFFFFFFFFFL; primMsg->NVALS = 0; primMsg->OutputType = OT_DATAVALUE; pt = (OldGetSigParams *) (primMsg->tokens); @@ -499,7 +499,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col) } if (((int64_t)primMsg->LBID)<0 && o_lbid>0) - primMsg->LBID = o_lbid; + primMsg->LBID = o_lbid & 0xFFFFFFFFFL; memcpy(&pt[primMsg->NVALS], filterString.buf(), filterString.length()); issuePrimitive(false); @@ -509,7 +509,8 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col) // bug 4901 - move this inside the loop and call incrementally // to save the unnecessary string copy - if (rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) { + if ((rg.getColTypes()[col] != execplan::CalpontSystemCatalog::VARBINARY) && + (rg.getColTypes()[col] != execplan::CalpontSystemCatalog::BLOB)) { for (i = curResultCounter; i < tmpResultCounter; i++) { rg.getRow(newRidList[i].pos, &r); //cout << "serializing " << tmpStrings[i] << endl; @@ -517,9 +518,40 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col) } } else { - for (i = curResultCounter; i < tmpResultCounter; i++) { + uint32_t firstTmpResultCounter = tmpResultCounter; + for (i = curResultCounter; i < firstTmpResultCounter; i++) { rg.getRow(newRidList[i].pos, &r); - r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col); + // If this is a multi-block blob, get all the blocks + // We do string copy here, should maybe have a RowGroup + // function to append strings or something? + if (((newRidList[i].token >> 46) < 0x3FFFF) && + ((newRidList[i].token >> 46) > 0)) + { + StringPtr multi_part[1]; + uint16_t old_offset = primMsg->tokens[0].offset; + string *result = new string((char*)tmpStrings[i].ptr, tmpStrings[i].len); + uint64_t origin_lbid = primMsg->LBID; + uint32_t lbid_count = newRidList[i].token >> 46; + primMsg->tokens[0].offset = 1; // first offset of a sig + for (uint32_t j = 1; j <= lbid_count; j++) + { + tmpResultCounter = 0; + primMsg->LBID = origin_lbid + j; + primMsg->NVALS = 1; + primMsg->tokens[0].LBID = origin_lbid + j; + issuePrimitive(false); + projectResult(multi_part); + result->append((char*)multi_part[0].ptr, multi_part[0].len); + } + primMsg->tokens[0].offset = old_offset; + tmpResultCounter = firstTmpResultCounter; + r.setVarBinaryField((unsigned char*)result->c_str(), result->length(), col); + delete result; + } + else + { + r.setVarBinaryField(tmpStrings[i].ptr, tmpStrings[i].len, col); + } } } curResultCounter = tmpResultCounter; @@ -528,6 +560,7 @@ void DictStep::_projectToRG(RowGroup &rg, uint32_t col) //cout << "_projectToRG() total length = " << totalResultLength << endl; idbassert(tmpResultCounter == bpp->ridCount); + delete [] tmpStrings; //cout << "DS: /projectingToRG l: " << (int64_t)primMsg->LBID // << " len: " << tmpResultCounter // << endl; diff --git a/primitives/primproc/filtercommand.cpp b/primitives/primproc/filtercommand.cpp index 91a89a235..018fc7853 100644 --- a/primitives/primproc/filtercommand.cpp +++ b/primitives/primproc/filtercommand.cpp @@ -94,7 +94,8 @@ Command* FilterCommand::makeFilterCommand(ByteStream& bs, vector& cmds // char[] is stored as int, but cannot directly compare if length is different // due to endian issue if (cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::CHAR || - cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::VARCHAR) + cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::VARCHAR || + cmd0->getColType().colDataType == execplan::CalpontSystemCatalog::BLOB) { StrFilterCmd* sc = new StrFilterCmd(); sc->setCompareFunc(CC); diff --git a/utils/funcexp/func_hex.cpp b/utils/funcexp/func_hex.cpp index 3d6630245..0af09feb4 100644 --- a/utils/funcexp/func_hex.cpp +++ b/utils/funcexp/func_hex.cpp @@ -96,6 +96,7 @@ string Func_hex::getStrVal(rowgroup::Row& row, break; } case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: { const string& arg = parm[0]->data()->getStrVal(row, isNull); uint64_t hexLen = arg.size() * 2; diff --git a/utils/funcexp/func_length.cpp b/utils/funcexp/func_length.cpp index 18a2f7232..25833d0a5 100644 --- a/utils/funcexp/func_length.cpp +++ b/utils/funcexp/func_length.cpp @@ -48,7 +48,8 @@ int64_t Func_length::getIntVal(rowgroup::Row& row, bool& isNull, CalpontSystemCatalog::ColType&) { - if (fp[0]->data()->resultType().colDataType == CalpontSystemCatalog::VARBINARY) + if ((fp[0]->data()->resultType().colDataType == CalpontSystemCatalog::VARBINARY) || + (fp[0]->data()->resultType().colDataType == CalpontSystemCatalog::BLOB)) return fp[0]->data()->getStrVal(row, isNull).length(); return strlen(fp[0]->data()->getStrVal(row, isNull).c_str()); diff --git a/utils/funcexp/funcexp.cpp b/utils/funcexp/funcexp.cpp index 675208f9a..6995cfa36 100644 --- a/utils/funcexp/funcexp.cpp +++ b/utils/funcexp/funcexp.cpp @@ -265,7 +265,9 @@ void FuncExp::evaluate(rowgroup::Row& row, std::vector& expressi break; } case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::VARCHAR: + // TODO: might not be right thing for BLOB + case CalpontSystemCatalog::BLOB: { const std::string& val = expression[i]->getStrVal(row, isNull); if (isNull) diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index c3685ab57..484e22208 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -38,6 +38,7 @@ using namespace std; #include +#include using namespace boost; #include "bytestream.h" @@ -73,9 +74,9 @@ StringStore::~StringStore() uint64_t inUse = 0, allocated = 0; for (i = 0; i < mem.size(); i++) { - MemChunk *tmp = (MemChunk *) mem.back().get(); - inUse += tmp->currentSize; - allocated += tmp->capacity; + std::string *tmp = mem.back().get(); + inUse += tmp->length(); + allocated += tmp->length(); } if (allocated > 0) cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse/(float) allocated << endl; @@ -84,7 +85,6 @@ StringStore::~StringStore() uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) { - MemChunk *lastMC = NULL; uint32_t ret = 0; empty = false; // At least a NULL is being stored. @@ -102,31 +102,10 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) if (fUseStoreStringMutex) lk.lock(); - if (mem.size() > 0) - lastMC = (MemChunk *) mem.back().get(); + shared_ptr newString(new std::string((char*)data, len)); + mem.push_back(newString); - if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < len)) { - // mem usage debugging - //if (lastMC) - //cout << "Memchunk efficiency = " << lastMC->currentSize << "/" << lastMC->capacity << endl; - shared_array newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); - mem.push_back(newOne); - lastMC = (MemChunk *) mem.back().get(); - lastMC->currentSize = 0; - lastMC->capacity = CHUNK_SIZE; - memset(lastMC->data, 0, CHUNK_SIZE); - } - - ret = ((mem.size()-1) * CHUNK_SIZE) + lastMC->currentSize; - memcpy(&(lastMC->data[lastMC->currentSize]), data, len); - /* - cout << "stored: '" << hex; - for (uint32_t i = 0; i < len ; i++) { - cout << (char) lastMC->data[lastMC->currentSize + i]; - } - cout << "' at position " << lastMC->currentSize << " len " << len << dec << endl; - */ - lastMC->currentSize += len; + ret = mem.size(); return ret; } @@ -134,16 +113,17 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) void StringStore::serialize(ByteStream &bs) const { uint32_t i; - MemChunk *mc; - + std::string empty_str; + bs << (uint32_t) mem.size(); bs << (uint8_t) empty; for (i = 0; i < mem.size(); i++) { - mc = (MemChunk *) mem[i].get(); - bs << (uint32_t) mc->currentSize; + if (mem[i].get() == NULL) + bs << empty_str; + else + bs << *mem[i].get(); //cout << "serialized " << mc->currentSize << " bytes\n"; - bs.append(mc->data, mc->currentSize); - } + } } uint32_t StringStore::deserialize(ByteStream &bs) @@ -151,27 +131,22 @@ uint32_t StringStore::deserialize(ByteStream &bs) uint32_t i; uint32_t count; uint32_t size; - uint8_t *buf; - MemChunk *mc; + std::string buf; uint8_t tmp8; uint32_t ret = 0; //mem.clear(); bs >> count; - mem.resize(count); + mem.reserve(count); bs >> tmp8; empty = (bool) tmp8; ret += 5; for (i = 0; i < count; i++) { - bs >> size; //cout << "deserializing " << size << " bytes\n"; - buf = bs.buf(); - mem[i].reset(new uint8_t[size + sizeof(MemChunk)]); - mc = (MemChunk *) mem[i].get(); - mc->currentSize = size; - mc->capacity = size; - memcpy(mc->data, buf, size); - bs.advance(size); + bs >> buf; + shared_ptr newString(new std::string(buf)); + mem.push_back(newString); + //bs.advance(size); ret += (size + 4); } return ret; @@ -179,7 +154,7 @@ uint32_t StringStore::deserialize(ByteStream &bs) void StringStore::clear() { - vector > emptyv; + vector > emptyv; mem.swap(emptyv); empty = true; } @@ -365,7 +340,8 @@ string Row::toString() const else switch (types[i]) { case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::VARCHAR: { + case CalpontSystemCatalog::VARCHAR: + { const string &tmp = getStringField(i); os << "(" << getStringLength(i) << ") '" << tmp << "' "; break; @@ -381,7 +357,9 @@ string Row::toString() const case CalpontSystemCatalog::LONGDOUBLE: os << getLongDoubleField(i) << " "; break; - case CalpontSystemCatalog::VARBINARY: { + case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: + { uint32_t len = getVarBinaryLength(i); const uint8_t* val = getVarBinaryField(i); os << "0x" << hex; @@ -429,7 +407,9 @@ string Row::toCSV() const case CalpontSystemCatalog::LONGDOUBLE: os << getLongDoubleField(i); break; - case CalpontSystemCatalog::VARBINARY: { + case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: + { uint32_t len = getVarBinaryLength(i); const uint8_t* val = getVarBinaryField(i); os << "0x" << hex; @@ -532,6 +512,11 @@ void Row::initToNull() memset(&data[offsets[i]], 0xFF, getColumnWidth(i)); break; } + case CalpontSystemCatalog::BLOB: { + // TODO: no NULL value for long double yet, this is a nan. + memset(&data[offsets[i]], 0xFF, getColumnWidth(i)); + break; + } default: ostringstream os; os << "Row::initToNull(): got bad column type (" << types[i] << @@ -603,6 +588,7 @@ bool Row::isNullValue(uint32_t colIndex) const } break; } + case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::VARBINARY: { uint32_t pos = offsets[colIndex]; if (inStringTable(colIndex)) { @@ -1095,13 +1081,13 @@ void applyMapping(const int *mapping, const Row &in, Row *out) for (i = 0; i < in.getColumnCount(); i++) if (mapping[i] != -1) { - if (UNLIKELY(in.isLongString(i))) + if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB)) + out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]); + else if (UNLIKELY(in.isLongString(i))) out->setStringField(in.getStringPointer(i), in.getStringLength(i), mapping[i]); //out->setStringField(in.getStringField(i), mapping[i]); else if (UNLIKELY(in.isShortString(i))) out->setUintField(in.getUintField(i), mapping[i]); - else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY)) - out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]); else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE)) out->setLongDoubleField(in.getLongDoubleField(i), mapping[i]); else if (in.isUnsigned(i)) diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index ef218b225..66d3ed8e2 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -119,14 +119,8 @@ private: // This is an overlay b/c the underlying data needs to be any size, // and alloc'd in one chunk. data can't be a sepatate dynamic chunk. - struct MemChunk - { - uint32_t currentSize; - uint32_t capacity; - uint8_t data[]; - }; - - std::vector > mem; + + std::vector > mem; bool empty; bool fUseStoreStringMutex; //@bug6065, make StringStore::storeString() thread safe boost::mutex fMutex; @@ -797,7 +791,8 @@ inline void Row::copyField(uint32_t destIndex, uint32_t srcIndex) const inline void Row::copyField(Row &out, uint32_t destIndex, uint32_t srcIndex) const { - if (UNLIKELY(types[srcIndex] == execplan::CalpontSystemCatalog::VARBINARY)) + if (UNLIKELY(types[srcIndex] == execplan::CalpontSystemCatalog::VARBINARY || + types[srcIndex] == execplan::CalpontSystemCatalog::BLOB)) out.setVarBinaryField(getVarBinaryStringField(srcIndex), destIndex); else if (UNLIKELY(isLongString(srcIndex))) out.setStringField(getStringPointer(srcIndex), getStringLength(srcIndex), destIndex); @@ -1268,7 +1263,8 @@ inline bool RowGroup::isLongString(uint32_t colIndex) const { return ((getColumnWidth(colIndex) > 7 && types[colIndex] == execplan::CalpontSystemCatalog::VARCHAR) || (getColumnWidth(colIndex) > 8 && types[colIndex] == execplan::CalpontSystemCatalog::CHAR) || - types[colIndex] == execplan::CalpontSystemCatalog::VARBINARY); + types[colIndex] == execplan::CalpontSystemCatalog::VARBINARY || + types[colIndex] == execplan::CalpontSystemCatalog::BLOB); } inline bool RowGroup::usesStringTable() const @@ -1421,7 +1417,7 @@ inline void copyRow(const Row &in, Row *out, uint32_t colCount) } for (uint32_t i = 0; i < colCount; i++) { - if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY)) + if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB)) out->setVarBinaryField(in.getVarBinaryStringField(i), i); else if (UNLIKELY(in.isLongString(i))) //out->setStringField(in.getStringField(i), i); @@ -1445,17 +1441,13 @@ inline std::string StringStore::getString(uint32_t off, uint32_t len) const if (off == std::numeric_limits::max()) return joblist::CPNULLSTRMARK; - MemChunk *mc; - uint32_t chunk = off / CHUNK_SIZE; - uint32_t offset = off % CHUNK_SIZE; - // this has to handle uninitialized data as well. If it's uninitialized it doesn't matter - // what gets returned, it just can't go out of bounds. - if (mem.size() <= chunk) + if ((mem.size() < off) || off == 0) return joblist::CPNULLSTRMARK; - mc = (MemChunk *) mem[chunk].get(); - if ((offset + len) > mc->currentSize) - return joblist::CPNULLSTRMARK; - return std::string((char *) &(mc->data[offset]), len); + + if (mem[off-1].get() == NULL) + return joblist::CPNULLSTRMARK; + + return *mem[off-1].get(); } inline const uint8_t * StringStore::getPointer(uint32_t off) const @@ -1463,17 +1455,15 @@ inline const uint8_t * StringStore::getPointer(uint32_t off) const if (off == std::numeric_limits::max()) return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); - uint32_t chunk = off / CHUNK_SIZE; - uint32_t offset = off % CHUNK_SIZE; - MemChunk *mc; // this has to handle uninitialized data as well. If it's uninitialized it doesn't matter // what gets returned, it just can't go out of bounds. - if (UNLIKELY(mem.size() <= chunk)) + if (UNLIKELY(mem.size() < off)) return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); - mc = (MemChunk *) mem[chunk].get(); - if (offset > mc->currentSize) - return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); - return &(mc->data[offset]); + + if (off == 0 || (mem[off-1].get() == NULL)) + return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); + + return (uint8_t*)mem[off-1].get()->c_str(); } inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const @@ -1484,17 +1474,15 @@ inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const if (len < 8) return false; - uint32_t chunk = off / CHUNK_SIZE; - uint32_t offset = off % CHUNK_SIZE; - MemChunk *mc; - if (mem.size() <= chunk) + if ((mem.size() < off) || off == 0) return true; - mc = (MemChunk *) mem[chunk].get(); - if ((offset + len) > mc->currentSize) - return true; - if (mc->data[offset] == 0) // "" = NULL string for some reason... - return true; - return (*((uint64_t *) &mc->data[offset]) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str())); + + if (mem[off-1].get() == NULL) + return true; + + if (mem[off-1].get()->empty()) // Empty string is NULL + return true; + return (mem[off-1].get()->compare(joblist::CPNULLSTRMARK) == 0); } inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t len) const @@ -1502,15 +1490,13 @@ inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t l if (off == std::numeric_limits::max() || len == 0) return str == joblist::CPNULLSTRMARK; - uint32_t chunk = off / CHUNK_SIZE; - uint32_t offset = off % CHUNK_SIZE; - if (mem.size() <= chunk) - return false; - MemChunk *mc = (MemChunk *) mem[chunk].get(); - if ((offset + len) > mc->currentSize) + if ((mem.size() < off) || off == 0) return false; - return (strncmp(str.c_str(), (const char *) &mc->data[offset], len) == 0); + if (mem[off-1].get() == NULL) + return false; + + return (mem[off-1].get()->compare(str) == 0); } inline bool StringStore::isEmpty() const @@ -1522,11 +1508,9 @@ inline uint64_t StringStore::getSize() const { uint32_t i; uint64_t ret = 0; - MemChunk *mc; - + for (i = 0; i < mem.size(); i++) { - mc = (MemChunk *) mem[i].get(); - ret += mc->capacity; + ret+= mem[i].get()->length(); } return ret; } diff --git a/writeengine/bulk/we_columninfo.cpp b/writeengine/bulk/we_columninfo.cpp index dda374750..6929611ee 100644 --- a/writeengine/bulk/we_columninfo.cpp +++ b/writeengine/bulk/we_columninfo.cpp @@ -1792,7 +1792,7 @@ int ColumnInfo::updateDctnryStore(char* buf, // column. // This only applies to default text mode. This step is bypassed for // binary imports, because in that case, the data is already true binary. - if ((curCol.colType == WR_VARBINARY) && + if (((curCol.colType == WR_VARBINARY) || (curCol.colType == WR_BLOB)) && (fpTableInfo->getImportDataMode() == IMPORT_DATA_TEXT)) { #ifdef PROFILE diff --git a/writeengine/dictionary/we_dctnry.cpp b/writeengine/dictionary/we_dctnry.cpp index 626ec9ea4..da669f3bf 100644 --- a/writeengine/dictionary/we_dctnry.cpp +++ b/writeengine/dictionary/we_dctnry.cpp @@ -64,7 +64,7 @@ namespace const int START_HDR1 = // start loc of 2nd offset (HDR1) HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE; const int PSEUDO_COL_WIDTH = DICT_COL_WIDTH; // used to convert row count to block count - + const int MAX_BLOB_SIZE = 2100000000; // for safety, we use an 18bit block count of 8KB blocks } namespace WriteEngine @@ -620,17 +620,105 @@ bool Dctnry::getTokenFromArray(Signature& sig) * token - token that was assigned to the inserted signature * * RETURN: - * none + * success - successfully write the signature to the block + * failure - failed to extend/create an extent for the block ******************************************************************************/ -void Dctnry::insertDctnry2(Signature& sig) +int Dctnry::insertDctnry2(Signature& sig) { - insertDctnryHdr(m_curBlock.data, - sig.size); - insertSgnture(m_curBlock.data, sig.size, (unsigned char*)sig.signature); + int rc = 0; + int write_size; + bool lbid_in_token = false; - sig.token.fbo = m_curLbid; - sig.token.op = m_curOp; - sig.token.spare = 0U; + sig.token.bc = 0; + + while (sig.size > 0) + { + if (sig.size > (m_freeSpace - m_totalHdrBytes)) + { + write_size = (m_freeSpace - m_totalHdrBytes); + } + else + { + write_size = sig.size; + } + + insertDctnryHdr(m_curBlock.data, write_size); + insertSgnture(m_curBlock.data, write_size, (unsigned char*)sig.signature); + + sig.size -= write_size; + sig.signature += write_size; + m_curFbo = m_lastFbo; + + if (!lbid_in_token) + { + sig.token.fbo = m_curLbid; + sig.token.op = m_curOp; + lbid_in_token = true; + } + + if (sig.size > 0) + { + CommBlock cb; + cb.file.oid = m_dctnryOID; + cb.file.pFile = m_dFile; + sig.token.bc++; + + RETURN_ON_ERROR( writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo) ); + memset( m_curBlock.data, 0, sizeof(m_curBlock.data)); + memcpy( m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes); + m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes; + m_curBlock.state = BLK_WRITE; + m_curOp =0; + m_lastFbo++; + m_curFbo = m_lastFbo; + + //...Expand current extent if it is an abbreviated initial extent + if ((m_curFbo == m_numBlocks) && + (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT)) + { + RETURN_ON_ERROR( expandDctnryExtent() ); + } + + //...Allocate a new extent if we have reached the last block in the + // current extent. + if (m_curFbo == m_numBlocks) + {//last block + //for roll back the extent to use + //Save those empty extents in case of failure to rollback + std::vector dictExtentInfo; + ExtentInfo info; + info.oid = m_dctnryOID; + info.partitionNum = m_partition; + info.segmentNum = m_segment; + info.dbRoot = m_dbRoot; + info.hwm = m_hwm; + info.newFile = false; + dictExtentInfo.push_back (info); + LBID_t startLbid; + // Add an extent. + rc = createDctnry(m_dctnryOID, + 0, // dummy column width + m_dbRoot, + m_partition, + m_segment, + startLbid, + false) ; + if ( rc != NO_ERROR ) + { + //roll back the extent + BRMWrapper::getInstance()->deleteEmptyDictStoreExtents( + dictExtentInfo); + return rc; + } + } + RETURN_ON_ERROR( BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, + m_partition, m_segment, + m_curFbo, m_curLbid) ); + m_curBlock.lbid = m_curLbid; + + } + } + return NO_ERROR; } /******************************************************************************* @@ -706,7 +794,7 @@ int Dctnry::insertDctnry(const char* buf, // still check against max size & set to null token if needed. if ((curSig.size == 0) || (curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) || - (curSig.size > MAX_SIGNATURE_SIZE)) + (curSig.size > MAX_BLOB_SIZE)) { if (m_defVal.length() > 0) // use default string if available { @@ -736,7 +824,9 @@ int Dctnry::insertDctnry(const char* buf, } //...Search for the string in our string cache - if (m_arraySize < MAX_STRING_CACHE_SIZE) + //if it fits into one block (< 8KB) + if ((m_arraySize < MAX_STRING_CACHE_SIZE) && + (curSig.size <= MAX_SIGNATURE_SIZE)) { //Stats::startParseEvent("getTokenFromArray"); found = getTokenFromArray(curSig); @@ -750,14 +840,15 @@ int Dctnry::insertDctnry(const char* buf, } //Stats::stopParseEvent("getTokenFromArray"); } - totalUseSize = HDR_UNIT_SIZE + curSig.size; + totalUseSize = m_totalHdrBytes + curSig.size; //...String not found in cache, so proceed. // If room is available in current block then insert into block. // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback - if( (totalUseSize <= m_freeSpace) && + if( ((totalUseSize <= m_freeSpace) || + ((curSig.size > 8176) && (m_freeSpace > m_totalHdrBytes))) && (m_curOp < (MAX_OP_COUNT-1)) ) { - insertDctnry2(curSig); //m_freeSpace updated! + RETURN_ON_ERROR(insertDctnry2(curSig)); //m_freeSpace updated! m_curBlock.state = BLK_WRITE; memcpy( pOut + outOffset, &curSig.token, 8 ); outOffset += 8; @@ -778,7 +869,9 @@ int Dctnry::insertDctnry(const char* buf, } //...Add string to cache, if we have not exceeded cache limit - if (m_arraySize < MAX_STRING_CACHE_SIZE) + // Don't cache big blobs + if ((m_arraySize < MAX_STRING_CACHE_SIZE) && + (curSig.size <= MAX_SIGNATURE_SIZE)) { addToStringCache( curSig ); } @@ -878,14 +971,15 @@ int Dctnry::insertDctnry(const char* buf, // we need to add the string to the new block. if (!found) { - insertDctnry2(curSig); //m_freeSpace updated! + RETURN_ON_ERROR(insertDctnry2(curSig)); //m_freeSpace updated! m_curBlock.state = BLK_WRITE; memcpy( pOut + outOffset, &curSig.token, 8 ); outOffset += 8; startPos++; //...Add string to cache, if we have not exceeded cache limit - if (m_arraySize < MAX_STRING_CACHE_SIZE) + if ((m_arraySize < MAX_STRING_CACHE_SIZE) && + (curSig.size <= MAX_SIGNATURE_SIZE)) { addToStringCache( curSig ); } @@ -943,9 +1037,12 @@ int Dctnry::insertDctnry(const int& sgnature_size, int i; unsigned char* value = NULL; int size; - if (sgnature_size > MAX_SIGNATURE_SIZE) + int write_size; + bool lbid_in_token = false; + // Round down for safety. In theory we can take 262143 * 8176 bytes + if (sgnature_size > MAX_BLOB_SIZE) { - return ERR_DICT_SIZE_GT_8000; + return ERR_DICT_SIZE_GT_2G; } if (sgnature_size == 0) { @@ -960,23 +1057,41 @@ int Dctnry::insertDctnry(const int& sgnature_size, size = sgnature_size; value = (unsigned char*)sgnature_value; + token.bc = 0; for (i = m_lastFbo; i < m_numBlocks; i++) { // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback - if( (m_freeSpace>= (size + HDR_UNIT_SIZE)) && + if( ((m_freeSpace>= (size + m_totalHdrBytes)) || + ((size > 8176) && (m_freeSpace > m_totalHdrBytes))) && (m_curOp < (MAX_OP_COUNT-1)) ) { // found the perfect block; signature size fit in this block - insertDctnryHdr(m_curBlock.data, size); - insertSgnture(m_curBlock.data, size, value); + if (size > (m_freeSpace - m_totalHdrBytes)) + { + write_size = (m_freeSpace - m_totalHdrBytes); + } + else + { + write_size = size; + } + insertDctnryHdr(m_curBlock.data, write_size); + insertSgnture(m_curBlock.data, write_size, value); + size -= write_size; + value += write_size; m_curBlock.state = BLK_WRITE; - token.fbo = m_curLbid; - token.op = m_curOp; - token.spare = 0; + // We only want the start LBID for a multi-block dict in the token + if (!lbid_in_token) + { + token.fbo = m_curLbid; + token.op = m_curOp; + lbid_in_token = true; + } + if (size > 0) + token.bc++; m_lastFbo = i; m_curFbo = m_lastFbo; - if (m_curOp < (MAX_OP_COUNT-1)) + if ((m_curOp < (MAX_OP_COUNT-1)) && (size <= 0)) return NO_ERROR; }//end Found diff --git a/writeengine/dictionary/we_dctnry.h b/writeengine/dictionary/we_dctnry.h index d3b55896c..9faa1da9e 100644 --- a/writeengine/dictionary/we_dctnry.h +++ b/writeengine/dictionary/we_dctnry.h @@ -239,7 +239,7 @@ protected: // insertDctnryHdr inserts the new value info into the header. // insertSgnture inserts the new value into the block. // - void insertDctnry2(Signature& sig); + int insertDctnry2(Signature& sig); void insertDctnryHdr( unsigned char* blockBuf, const int& size); void insertSgnture(unsigned char* blockBuf, const int& size, unsigned char*value); diff --git a/writeengine/server/we_ddlcommandproc.cpp b/writeengine/server/we_ddlcommandproc.cpp index 2ccdd08ae..4703b02eb 100644 --- a/writeengine/server/we_ddlcommandproc.cpp +++ b/writeengine/server/we_ddlcommandproc.cpp @@ -450,7 +450,8 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string &err bool hasDict = false; if ( (dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) || (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) || - (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ) + (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) || + (dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ) { hasDict = true; dictOID.compressionType = colDefPtr->fType->fCompressiontype; @@ -459,17 +460,20 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string &err dictcol++; //@Bug 2534. Take away the limit of 255 and set the limit to 8000. - if (colDefPtr->fType->fLength > 8000) + if ((colDefPtr->fType->fLength > 8000) && + (dataType != CalpontSystemCatalog::BLOB)) { ostringstream os; os << "char, varchar and varbinary length may not exceed 8000"; throw std::runtime_error(os.str()); } } - else if (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength <= 7) + else if ((dataType == CalpontSystemCatalog::VARBINARY + || dataType == CalpontSystemCatalog::BLOB) + && colDefPtr->fType->fLength <= 7) { ostringstream os; - os << "varbinary length may not be less than 8"; + os << "varbinary and blob length may not be less than 8"; throw std::runtime_error(os.str()); } @@ -514,7 +518,8 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string &err //@Bug 2089 Disallow zero length char and varch column to be created if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR || - dataType == CalpontSystemCatalog::VARBINARY) + dataType == CalpontSystemCatalog::VARBINARY || + dataType == CalpontSystemCatalog::BLOB) { if (colDefPtr->fType->fLength <= 0) { @@ -829,17 +834,20 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string & err) dictOID.dictOID = dictoid; //@Bug 2534. Take away the limit of 255 and set the limit to 8000. - if (colDefPtr->fType->fLength > 8000) + if ((colDefPtr->fType->fLength > 8000) && + (dataType != CalpontSystemCatalog::BLOB)) { ostringstream os; os << "char, varchar and varbinary length may not exceed 8000"; throw std::runtime_error(os.str()); } } - else if (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength <= 7) + else if ((dataType == CalpontSystemCatalog::VARBINARY + || dataType == CalpontSystemCatalog::BLOB) + && colDefPtr->fType->fLength <= 7) { ostringstream os; - os << "varbinary length may not be less than 8"; + os << "varbinary and blob length may not be less than 8"; throw std::runtime_error(os.str()); } @@ -885,7 +893,8 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string & err) //@Bug 2089 Disallow zero length char and varch column to be created if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR || - dataType == CalpontSystemCatalog::VARBINARY) + dataType == CalpontSystemCatalog::VARBINARY || + dataType == CalpontSystemCatalog::BLOB) { if (colDefPtr->fType->fLength <= 0) { @@ -2242,6 +2251,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) + || (column.colType.colDataType == CalpontSystemCatalog::BLOB + && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) || (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL @@ -2293,7 +2304,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string //It's the same string for each column, so we just need one dictionary struct memset(&dictTuple, 0, sizeof(dictTuple)); - memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length()); + dictTuple.sigValue = (unsigned char*)newTablename.c_str(); dictTuple.sigSize = newTablename.length(); dictTuple.isNull = false; dctColList = dictTuple; @@ -2628,7 +2639,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string & dictStruct.columnOid = column.colType.columnOID; WriteEngine::DctnryTuple dictTuple; dictTuple.isNull = false; - memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length()); + dictTuple.sigValue = (unsigned char*)newTablename.c_str(); dictTuple.sigSize = newTablename.length(); if (idbdatafile::IDBPolicy::useHdfs()) @@ -2844,7 +2855,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string dictStruct.columnOid = column.colType.columnOID; WriteEngine::DctnryTuple dictTuple; dictTuple.isNull = false; - memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length()); + dictTuple.sigValue = (unsigned char*)newTablename.c_str(); dictTuple.sigSize = newTablename.length(); //int error = NO_ERROR; //if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple))) @@ -2998,6 +3009,8 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) + || (column.colType.colDataType == CalpontSystemCatalog::BLOB + && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) || (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL @@ -3015,7 +3028,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string //Tokenize the data value dictStruct.dctnryOid = column.colType.ddn.dictOID; dictStruct.columnOid = column.colType.columnOID; - memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length()); + dictTuple.sigValue = (unsigned char*)newTablename.c_str(); dictTuple.sigSize = newTablename.length(); dictTuple.isNull = false; /* @@ -3066,7 +3079,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string //It's the same string for each column, so we just need one dictionary struct memset(&dictTuple, 0, sizeof(dictTuple)); - memcpy(dictTuple.sigValue, newTablename.c_str(), newTablename.length()); + dictTuple.sigValue = (unsigned char*)newTablename.c_str(); dictTuple.sigSize = newTablename.length(); dictTuple.isNull = false; dctColList = dictTuple; @@ -3878,6 +3891,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) + || (column.colType.colDataType == CalpontSystemCatalog::BLOB + && column.colType.colWidth > 7) || (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) || (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL @@ -3906,7 +3921,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs else { WriteEngine::DctnryTuple dictTuple; - memcpy(dictTuple.sigValue, defaultvalue.c_str(), defaultvalue.length()); + dictTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dictTuple.sigSize = defaultvalue.length(); dictTuple.isNull = false; int error = NO_ERROR; @@ -3952,7 +3967,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs WriteEngine::DctnryTuple dctnryTuple; if(defaultvalue.length() > 0) { - memcpy(dctnryTuple.sigValue, defaultvalue.c_str(), defaultvalue.length()); + dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dctnryTuple.sigSize = defaultvalue.length(); dctnryTuple.isNull = false; } @@ -4146,6 +4161,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& && column1.colType.colWidth > 7) || (column1.colType.colDataType == CalpontSystemCatalog::VARBINARY && column1.colType.colWidth > 7) + || (column1.colType.colDataType == CalpontSystemCatalog::BLOB + && column1.colType.colWidth > 7) || (column1.colType.colDataType == CalpontSystemCatalog::DECIMAL && column1.colType.precision > 18) || (column1.colType.colDataType == CalpontSystemCatalog::UDECIMAL @@ -4170,7 +4187,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& dictStruct.dctnryOid = column1.colType.ddn.dictOID; dictStruct.columnOid = column1.colType.columnOID; WriteEngine::DctnryTuple dictTuple; - memcpy(dictTuple.sigValue, colNewName.c_str(), colNewName.length()); + dictTuple.sigValue = (unsigned char*)colNewName.c_str(); dictTuple.sigSize = colNewName.length(); dictTuple.isNull = false; int error = NO_ERROR; @@ -4221,7 +4238,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colValuesList.push_back(aColList1); WriteEngine::DctnryTuple dctnryTuple; boost::to_lower(colNewName); - memcpy(dctnryTuple.sigValue, colNewName.c_str(), colNewName.length()); + dctnryTuple.sigValue = (unsigned char*)colNewName.c_str(); dctnryTuple.sigSize = colNewName.length(); dctnryTuple.isNull = false; dctColList = dctnryTuple; @@ -4336,6 +4353,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& && column5.colType.colWidth > 7) || (column5.colType.colDataType == CalpontSystemCatalog::VARBINARY && column5.colType.colWidth > 7) + || (column5.colType.colDataType == CalpontSystemCatalog::BLOB + && column5.colType.colWidth > 7) || (column5.colType.colDataType == CalpontSystemCatalog::DECIMAL && column5.colType.precision > 18) || (column5.colType.colDataType == CalpontSystemCatalog::UDECIMAL @@ -4369,7 +4388,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& else { WriteEngine::DctnryTuple dictTuple; - memcpy(dictTuple.sigValue, defaultvalue.c_str(), defaultvalue.length()); + dictTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dictTuple.sigSize = defaultvalue.length(); dictTuple.isNull = false; int error = NO_ERROR; @@ -4418,7 +4437,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& if(defaultvalue.length() > 0) { - memcpy(dctnryTuple.sigValue, defaultvalue.c_str(), defaultvalue.length()); + dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str(); dctnryTuple.sigSize = defaultvalue.length(); dctnryTuple.isNull = false; } diff --git a/writeengine/server/we_ddlcommon.h b/writeengine/server/we_ddlcommon.h index 827511646..e83e2d97f 100644 --- a/writeengine/server/we_ddlcommon.h +++ b/writeengine/server/we_ddlcommon.h @@ -316,6 +316,7 @@ inline boost::any getNullValueForType(const execplan::CalpontSystemCatalog::ColT } break; + case execplan::CalpontSystemCatalog::BLOB: case execplan::CalpontSystemCatalog::VARBINARY: { std::string charnull; diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 0afb5f342..e60c48bb0 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -2027,7 +2027,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, value.c_str(), value.length()); + dctTuple.sigValue = (unsigned char*)value.c_str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; @@ -2071,6 +2071,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, break; } case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: { value = row.getVarBinaryStringField(fetchColPos); break; @@ -2203,7 +2204,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, } WriteEngine::DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, value.c_str(), value.length()); + dctTuple.sigValue = (unsigned char*)value.c_str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; @@ -2253,7 +2254,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, value.c_str(), value.length()); + dctTuple.sigValue = (unsigned char*)value.c_str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType); @@ -2293,7 +2294,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, value.c_str(), value.length()); + dctTuple.sigValue = (unsigned char*)value.c_str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType); @@ -2356,6 +2357,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, break; } case CalpontSystemCatalog::VARBINARY: + case CalpontSystemCatalog::BLOB: { value = row.getVarBinaryStringField(fetchColPos); break; diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index b92462bd7..eacdddf5b 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -110,7 +110,8 @@ class WE_DMLCommandProc || ((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) || ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 18)) || ((colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL) && (colType.precision > 18)) - || (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY)) + || (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY) + || (colType.colDataType == execplan::CalpontSystemCatalog::BLOB)) { return true; } diff --git a/writeengine/shared/we_chunkmanager.cpp b/writeengine/shared/we_chunkmanager.cpp index b0d40ff3d..fce6cec98 100644 --- a/writeengine/shared/we_chunkmanager.cpp +++ b/writeengine/shared/we_chunkmanager.cpp @@ -1587,6 +1587,9 @@ int ChunkManager::calculateHeaderSize(int width) int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows(); int rowsPerFile = rowsPerExtent * extentsPerFile; int stringsPerBlock = 8180 / (width + 2); // 8180 = 8192 - 12 + // BLOB is 1 string per block + if (stringsPerBlock == 0) + stringsPerBlock = 1; int blocksNeeded = rowsPerFile / stringsPerBlock; int blocksPerChunk = UNCOMPRESSED_CHUNK_SIZE / BYTE_PER_BLOCK; lldiv_t chunks = lldiv(blocksNeeded, blocksPerChunk); diff --git a/writeengine/shared/we_define.cpp b/writeengine/shared/we_define.cpp index 34a8459c3..3bec2bc0c 100644 --- a/writeengine/shared/we_define.cpp +++ b/writeengine/shared/we_define.cpp @@ -135,7 +135,7 @@ WErrorCodes::WErrorCodes() : fErrorCodes() // Dictionary error fErrorCodes[ERR_DICT_NO_SPACE_INSERT] = " no space for a dictionary insert"; - fErrorCodes[ERR_DICT_SIZE_GT_8000] = " the dictionary size was >8000"; + fErrorCodes[ERR_DICT_SIZE_GT_2G] = " the dictionary size was > 2GB"; fErrorCodes[ERR_DICT_NO_OP_DELETE] = " in the dictionary no op delete"; fErrorCodes[ERR_DICT_NO_OFFSET_DELETE] = " a dictionary bad Delete offset"; fErrorCodes[ERR_DICT_INVALID_HDR] = " a dictionary bad Delete Hdr"; diff --git a/writeengine/shared/we_define.h b/writeengine/shared/we_define.h index f86110e16..743966aeb 100644 --- a/writeengine/shared/we_define.h +++ b/writeengine/shared/we_define.h @@ -231,7 +231,7 @@ namespace WriteEngine // Dictionary error //-------------------------------------------------------------------------- const int ERR_DICT_NO_SPACE_INSERT= ERR_DCTNRYBASE+ 1; // ins no space - const int ERR_DICT_SIZE_GT_8000 = ERR_DCTNRYBASE+ 2; // ins size >8000 + const int ERR_DICT_SIZE_GT_2G = ERR_DCTNRYBASE+ 2; // ins size >8000 const int ERR_DICT_NO_OP_DELETE = ERR_DCTNRYBASE+ 3; // del no op const int ERR_DICT_NO_OFFSET_DELETE=ERR_DCTNRYBASE+ 4; // del bad offset const int ERR_DICT_INVALID_HDR = ERR_DCTNRYBASE+ 5; // Delete Hdr diff --git a/writeengine/shared/we_type.h b/writeengine/shared/we_type.h index 4091a911e..919d2d27c 100644 --- a/writeengine/shared/we_type.h +++ b/writeengine/shared/we_type.h @@ -290,7 +290,7 @@ namespace WriteEngine struct DctnryTuple /** @brief Dictionary Tuple struct*/ { - unsigned char sigValue[MAX_SIGNATURE_SIZE]; /** @brief dictionary signature value*/ + unsigned char *sigValue; /** @brief dictionary signature value*/ int sigSize; /** @brief dictionary signature size */ Token token; /** @brief dictionary token */ bool isNull; diff --git a/writeengine/shared/we_typeext.h b/writeengine/shared/we_typeext.h index caff00379..cc23dab56 100644 --- a/writeengine/shared/we_typeext.h +++ b/writeengine/shared/we_typeext.h @@ -45,12 +45,12 @@ namespace WriteEngine struct Token { uint64_t op : 10; // ordinal position within a block uint64_t fbo : 36; // file block number - uint64_t spare : 18; // spare + uint64_t bc : 18; // block count Token() // constructor, set to null value { op = 0x3FE; fbo = 0xFFFFFFFFFLL; - spare = 0x3FFFF; + bc = 0x3FFFF; } }; diff --git a/writeengine/wrapper/we_colop.cpp b/writeengine/wrapper/we_colop.cpp index b14a1c225..f1c8bfa23 100644 --- a/writeengine/wrapper/we_colop.cpp +++ b/writeengine/wrapper/we_colop.cpp @@ -692,7 +692,7 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi dctnryStruct.colWidth = dictColWidth; dctnryStruct.fCompressionType = column.compressionType; DctnryTuple dctnryTuple; - memcpy(dctnryTuple.sigValue, defaultValStr.c_str(), defaultValStr.length()); + dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str(); dctnryTuple.sigSize = defaultValStr.length(); rc = dctnry->openDctnry(dctnryStruct.dctnryOid, @@ -761,7 +761,7 @@ int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, voi dctnryStruct.colWidth = dictColWidth; dctnryStruct.fCompressionType = column.compressionType; DctnryTuple dctnryTuple; - memcpy(dctnryTuple.sigValue, defaultValStr.c_str(), defaultValStr.length()); + dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str(); //WriteEngineWrapper wrapper; dctnryTuple.sigSize = defaultValStr.length(); //rc = wrapper.tokenize(txnid, dctnryStruct, dctnryTuple); @@ -1454,6 +1454,7 @@ int ColumnOp::addExtent( //pOldVal = &((double *) oldValArray)[i]; break; case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_BLOB : case WriteEngine::WR_CHAR : if (!bDelete) { @@ -1588,6 +1589,7 @@ int ColumnOp::addExtent( //pOldVal = &((double *) oldValArray)[i]; break; case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_BLOB : case WriteEngine::WR_CHAR : if (!bDelete) { @@ -1717,6 +1719,7 @@ int ColumnOp::addExtent( pVal = &((double *) valArray)[i]; break; case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_BLOB : case WriteEngine::WR_CHAR : { memcpy(charTmpBuf, (char*)valArray+i*8, 8); diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index d4d0e3553..00c5239df 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -624,7 +624,7 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, DctnryTuple dctnryTuple; DctColTupleList dctColTuples; - memcpy(dctnryTuple.sigValue, tmpStr.c_str(), tmpStr.length()); + dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str(); dctnryTuple.sigSize = tmpStr.length(); dctnryTuple.isNull = true; dctColTuples.push_back (dctnryTuple); @@ -1200,7 +1200,7 @@ timer.stop("allocRowId"); timer.start("tokenize"); #endif DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length()); + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); @@ -1248,7 +1248,7 @@ timer.stop("tokenize"); timer.start("tokenize"); #endif DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length()); + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); @@ -1710,7 +1710,7 @@ timer.start("allocRowId"); timer.start("tokenize"); #endif DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length()); + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); @@ -1776,7 +1776,7 @@ timer.stop("tokenize"); timer.start("tokenize"); #endif DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length()); + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); @@ -2333,7 +2333,7 @@ timer.stop("allocRowId"); timer.start("tokenize"); #endif DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length()); + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid, @@ -2403,7 +2403,7 @@ timer.stop("tokenize"); timer.start("tokenize"); #endif DctnryTuple dctTuple; - memcpy(dctTuple.sigValue, dctStr_iter->c_str(), dctStr_iter->length()); + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); dctTuple.sigSize = dctStr_iter->length(); dctTuple.isNull = false; rc = tokenize(txnid,