diff --git a/VERSION b/VERSION index 445a70a7a..aef96eefc 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=1 COLUMNSTORE_VERSION_MINOR=1 -COLUMNSTORE_VERSION_PATCH=4 +COLUMNSTORE_VERSION_PATCH=5 COLUMNSTORE_VERSION_RELEASE=1 diff --git a/dbcon/ddlpackage/CMakeLists.txt b/dbcon/ddlpackage/CMakeLists.txt index 27d2a3015..ae2f82fa9 100644 --- a/dbcon/ddlpackage/CMakeLists.txt +++ b/dbcon/ddlpackage/CMakeLists.txt @@ -9,6 +9,10 @@ ADD_CUSTOM_COMMAND( DEPENDS ddl.y ddl.l ) +# Parser puts extra info to stderr. +INCLUDE(../../check_compiler_flag.cmake) +MY_CHECK_AND_SET_COMPILER_FLAG("-DYYDEBUG" DEBUG) + ########### next target ############### set(ddlpackage_LIB_SRCS diff --git a/dbcon/ddlpackage/ddl.l b/dbcon/ddlpackage/ddl.l index ac51fe020..6eeaafb0b 100644 --- a/dbcon/ddlpackage/ddl.l +++ b/dbcon/ddlpackage/ddl.l @@ -18,6 +18,7 @@ /* $Id: ddl.l 9341 2013-03-27 14:10:35Z chao $ */ %{ +#include #include #include #include @@ -31,10 +32,11 @@ #endif using namespace ddlpackage; +typedef enum { NOOP, STRIP_QUOTES } copy_action_t; int lineno = 1; void ddlerror(struct pass_to_bison* x, char const *s); -static char* scanner_copy(char *str, yyscan_t yyscanner); +static char* scanner_copy(char *str, yyscan_t yyscanner, copy_action_t action = NOOP ); %} @@ -54,6 +56,10 @@ horiz_space [ \t\f] newline [\n\r] non_newline [^\n\r] +quote ' +double_quote \" +grave_accent ` + comment ("--"{non_newline}*) self [,()\[\].;\:\+\-\*\/\%\^\<\>\=] whitespace ({space}+|{comment}) @@ -62,6 +68,10 @@ digit [0-9] ident_start [A-Za-z\200-\377_] ident_cont [A-Za-z\200-\377_0-9\$] identifier {ident_start}{ident_cont}* +/* fully qualified names regexes */ +fq_identifier {identifier}\.{identifier} +identifier_quoted {grave_accent}{identifier}{grave_accent} +identifier_double_quoted {double_quote}{identifier}{double_quote} integer [-+]?{digit}+ decimal ([-+]?({digit}*\.{digit}+)|({digit}+\.{digit}*)) @@ -69,11 +79,13 @@ real ({integer}|{decimal})[Ee][-+]?{digit}+ realfail1 ({integer}|{decimal})[Ee] realfail2 ({integer}|{decimal})[Ee][-+] -quote ' -grave_accent ` %% + +{identifier_quoted} { ddlget_lval(yyscanner)->str = scanner_copy( ddlget_text(yyscanner), yyscanner, STRIP_QUOTES ); return IDENT; } +{identifier_double_quoted} { ddlget_lval(yyscanner)->str = scanner_copy( ddlget_text(yyscanner), yyscanner, STRIP_QUOTES ); return DQ_IDENT; } + ACTION {return ACTION;} ADD {return ADD;} ALTER {return ALTER;} @@ -171,14 +183,14 @@ LONGTEXT {return LONGTEXT;} /* ignore */ } -{identifier} {ddlget_lval(yyscanner)->str = scanner_copy(ddlget_text(yyscanner), yyscanner); return IDENT;} +{identifier} { ddlget_lval(yyscanner)->str = scanner_copy(ddlget_text(yyscanner), yyscanner); return DQ_IDENT;} {self} { return ddlget_text(yyscanner)[0]; } {grave_accent} { - /* ignore */ + return ddlget_text(yyscanner)[0]; } %% @@ -198,6 +210,11 @@ using namespace ddlpackage; */ void scanner_init(const char* str, yyscan_t yyscanner) { +#ifdef YYDEBUG + extern int ddldebug; + ddldebug = 1; +#endif + size_t slen = strlen(str); scan_data* pScanData = (scan_data*)ddlget_extra(yyscanner); @@ -246,10 +263,20 @@ void scanner_finish(yyscan_t yyscanner) pScanData->valbuf.clear(); } -char* scanner_copy (char *str, yyscan_t yyscanner) +char* scanner_copy (char *str, yyscan_t yyscanner, copy_action_t action) { - char* nv = strdup(str); - if(nv) - ((scan_data*)ddlget_extra(yyscanner))->valbuf.push_back(nv); - return nv; + char* result; + char* nv = strdup(str); + result = nv; + // free strduped memory later to prevent possible memory leak + if(nv) + ((scan_data*)ddlget_extra(yyscanner))->valbuf.push_back(nv); + + if(action == STRIP_QUOTES) + { + nv[strlen(str) - 1] = '\0'; + result = nv + 1; + } + + return result; } diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index 8d36b2c2b..96867cfb8 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -29,20 +29,13 @@ Understanding the New Sql book The postgress and mysql sources. find x -name \*.y -o -name \*.yy. - We don't support delimited identifiers. + We support quoted identifiers. All literals are stored as unconverted strings. You can't say "NOT DEFERRABLE". See the comment below. - This is not a reentrant parser. It uses the original global - variable style method of communication between the parser and - scanner. If we ever needed more than one parser thread per - processes, we would use the pure/reentrant options of bison and - flex. In that model, things that are traditionally global live - inside a struct that is passed around. We would need to upgrade to - a more recent version of flex. At the time of this writing, our - development systems have: flex version 2.5.4 + This is a reentrant parser. MCOL-66 Modify to be a reentrant parser */ @@ -121,7 +114,7 @@ REFERENCES RENAME RESTRICT SET SMALLINT TABLE TEXT TIME TINYBLOB TINYTEXT TINYINT TO UNIQUE UNSIGNED UPDATE USER SESSION_USER SYSTEM_USER VARCHAR VARBINARY VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE -%token IDENT FCONST SCONST CP_SEARCH_CONDITION_TEXT ICONST DATE +%token DQ_IDENT IDENT FCONST SCONST CP_SEARCH_CONDITION_TEXT ICONST DATE /* Notes: * 1. "ata" stands for alter_table_action @@ -205,6 +198,7 @@ VARYING WITH ZONE DOUBLE IDB_FLOAT REAL CHARSET IDB_IF EXISTS CHANGE TRUNCATE %type opt_if_exists %type opt_if_not_exists %type trunc_table_statement +%type ident %% stmtblock: stmtmulti { x->fParseTree = $1; } @@ -464,7 +458,7 @@ opt_equal: ; table_option: - ENGINE opt_equal IDENT {$$ = new pair("engine", $3);} + ENGINE opt_equal ident {$$ = new pair("engine", $3);} | MAX_ROWS opt_equal ICONST {$$ = new pair("max_rows", $3);} | @@ -479,9 +473,9 @@ table_option: $$ = new pair("auto_increment", $3); } | - DEFAULT CHARSET opt_equal IDENT {$$ = new pair("default charset", $4);} + DEFAULT CHARSET opt_equal ident {$$ = new pair("default charset", $4);} | - DEFAULT IDB_CHAR SET opt_equal IDENT {$$ = new pair("default charset", $5);} + DEFAULT IDB_CHAR SET opt_equal ident {$$ = new pair("default charset", $5);} ; alter_table_statement: @@ -611,15 +605,23 @@ table_name: ; qualified_name: - IDENT '.' IDENT {$$ = new QualifiedName($1, $3);} - | IDENT { + | ident { if (x->fDBSchema.size()) $$ = new QualifiedName((char*)x->fDBSchema.c_str(), $1); else $$ = new QualifiedName($1); } + | ident '.' ident + { + $$ = new QualifiedName($1, $3); + } ; +ident: + DQ_IDENT + | IDENT + ; + ata_add_column: /* See the documentation for SchemaObject for an explanation of why we are using * dynamic_cast here. @@ -632,11 +634,11 @@ ata_add_column: column_name: DATE - |IDENT + |ident ; constraint_name: - IDENT + ident ; column_option: @@ -696,6 +698,10 @@ default_clause: { $$ = new ColumnDefaultValue($2); } + | DEFAULT DQ_IDENT /* MCOL-1406 */ + { + $$ = new ColumnDefaultValue($2); + } | DEFAULT NULL_TOK {$$ = new ColumnDefaultValue(NULL);} | DEFAULT USER {$$ = new ColumnDefaultValue("$USER");} | DEFAULT CURRENT_USER {$$ = new ColumnDefaultValue("$CURRENT_USER");} diff --git a/dbcon/joblist/jlf_execplantojoblist.cpp b/dbcon/joblist/jlf_execplantojoblist.cpp index b6242ba77..cfd694f9b 100644 --- a/dbcon/joblist/jlf_execplantojoblist.cpp +++ b/dbcon/joblist/jlf_execplantojoblist.cpp @@ -1500,10 +1500,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo) return doExpressionFilter(sf, jobInfo); } - // trim trailing space char in the predicate string constval(cc->constval()); - size_t spos = constval.find_last_not_of(" "); - if (spos != string::npos) constval = constval.substr(0, spos+1); CalpontSystemCatalog::OID dictOid = 0; CalpontSystemCatalog::ColType ct = sc->colType(); @@ -2569,10 +2566,7 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo) if (ConstantColumn::NULLDATA == cc->type() && (opeq == *sop || opne == *sop)) cop = COMPARE_NIL; - // trim trailing space char string value = cc->constval(); - size_t spos = value.find_last_not_of(" "); - if (spos != string::npos) value = value.substr(0, spos+1); pds->addFilter(cop, value); } @@ -2652,10 +2646,7 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo) if (ConstantColumn::NULLDATA == cc->type() && (opeq == *sop || opne == *sop)) cop = COMPARE_NIL; - // trim trailing space char string value = cc->constval(); - size_t spos = value.find_last_not_of(" "); - if (spos != string::npos) value = value.substr(0, spos+1); pds->addFilter(cop, value); } @@ -2759,9 +2750,6 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo) int8_t cop = op2num(sop); int64_t value = 0; string constval = cc->constval(); - // trim trailing space char - size_t spos = constval.find_last_not_of(" "); - if (spos != string::npos) constval = constval.substr(0, spos+1); // @bug 1151 string longer than colwidth of char/varchar. uint8_t rf = 0; diff --git a/dbcon/joblist/tupleaggregatestep.cpp b/dbcon/joblist/tupleaggregatestep.cpp index b5cde967b..e68bc9f18 100755 --- a/dbcon/joblist/tupleaggregatestep.cpp +++ b/dbcon/joblist/tupleaggregatestep.cpp @@ -958,6 +958,7 @@ void TupleAggregateStep::prep1PhaseAggregate( vector functionVec; uint32_t bigIntWidth = sizeof(int64_t); uint32_t bigUintWidth = sizeof(uint64_t); + uint32_t projColsUDAFIndex = 0; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; // for count column of average function @@ -1135,18 +1136,27 @@ void TupleAggregateStep::prep1PhaseAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) - { - UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); - if (udafc) - { - // Create a RowAggFunctionCol (UDAF subtype) with the context. - funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i)); - } - else - { - throw logic_error("prep1PhasesAggregate: A UDAF function is called but there's no UDAFColumn"); - } - } + { + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + for (; it != jobInfo.projectionCols.end(); it++) + { + UDAFColumn* udafc = dynamic_cast((*it).get()); + projColsUDAFIndex++; + if (udafc) + { + pUDAFFunc = udafc->getContext().getFunction(); + // Create a RowAggFunctionCol (UDAF subtype) with the context. + funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, i)); + break; + } + + } + + if (it == jobInfo.projectionCols.end()) + { + throw logic_error("prep1PhaseAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); + } + } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, i)); @@ -1484,6 +1494,7 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( AGG_MAP aggFuncMap; mcsv1sdk::mcsv1_UDAF* pUDAFFunc = NULL; set avgSet; + uint32_t projColsUDAFIndex = 0; // for count column of average function map avgFuncMap, avgDistFuncMap; @@ -1636,19 +1647,27 @@ void TupleAggregateStep::prep1PhaseDistinctAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) - { - UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); - if (udafc) - { - pUDAFFunc = udafc->getContext().getFunction(); - // Create a RowAggFunctionCol (UDAF subtype) with the context. - funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); - } - else - { - throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no UDAFColumn"); - } - } + { + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + for (; it != jobInfo.projectionCols.end(); it++) + { + UDAFColumn* udafc = dynamic_cast((*it).get()); + projColsUDAFIndex++; + if (udafc) + { + pUDAFFunc = udafc->getContext().getFunction(); + // Create a RowAggFunctionCol (UDAF subtype) with the context. + funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAgg)); + break; + } + + } + + if (it == jobInfo.projectionCols.end()) + { + throw logic_error("prep1PhaseDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); + } + } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAgg)); @@ -2579,6 +2598,7 @@ void TupleAggregateStep::prep2PhasesAggregate( vector > aggColVec; set avgSet; vector >& returnedColVec = jobInfo.returnedColVec; + uint32_t projColsUDAFIndex = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { // skip if not an aggregation column @@ -2746,18 +2766,26 @@ void TupleAggregateStep::prep2PhasesAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); - if (udafc) - { - pUDAFFunc = udafc->getContext().getFunction(); - // Create a RowAggFunctionCol (UDAF subtype) with the context. - funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); - } - else - { - throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no UDAFColumn"); - } - } + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + for (; it != jobInfo.projectionCols.end(); it++) + { + UDAFColumn* udafc = dynamic_cast((*it).get()); + projColsUDAFIndex++; + if (udafc) + { + pUDAFFunc = udafc->getContext().getFunction(); + // Create a RowAggFunctionCol (UDAF subtype) with the context. + funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); + break; + } + + } + + if (it == jobInfo.projectionCols.end()) + { + throw logic_error("prep2PhasesAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); + } + } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); @@ -3301,6 +3329,7 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( vector > aggColVec, aggNoDistColVec; set avgSet, avgDistSet; vector >& returnedColVec = jobInfo.returnedColVec; + uint32_t projColsUDAFIndex = 0; for (uint64_t i = 0; i < returnedColVec.size(); i++) { // col should be an aggregate or groupBy or window function @@ -3498,18 +3527,25 @@ void TupleAggregateStep::prep2PhasesDistinctAggregate( SP_ROWAGG_FUNC_t funct; if (aggOp == ROWAGG_UDAF) { - UDAFColumn* udafc = dynamic_cast(jobInfo.projectionCols[i].get()); - if (udafc) - { - pUDAFFunc = udafc->getContext().getFunction(); - // Create a RowAggFunctionCol (UDAF subtype) with the context. - funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); - } - else - { - throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no UDAFColumn"); - } - } + std::vector::iterator it = jobInfo.projectionCols.begin() + projColsUDAFIndex; + for (; it != jobInfo.projectionCols.end(); it++) + { + UDAFColumn* udafc = dynamic_cast((*it).get()); + projColsUDAFIndex++; + if (udafc) + { + pUDAFFunc = udafc->getContext().getFunction(); + // Create a RowAggFunctionCol (UDAF subtype) with the context. + funct.reset(new RowUDAFFunctionCol(udafc->getContext(), colProj, colAggPm)); + break; + } + } + + if (it == jobInfo.projectionCols.end()) + { + throw logic_error("prep2PhasesDistinctAggregate: A UDAF function is called but there's no/not enough UDAFColumn/-s"); + } + } else { funct.reset(new RowAggFunctionCol(aggOp, stats, colProj, colAggPm)); diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 2fdd4330f..2ae631abb 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -47,7 +47,7 @@ using namespace dataconvert; #endif namespace { //returns the value of 10 raised to the power x. -inline double pow10(double x) +inline double exp10(double x) { return exp(x * M_LN10); } @@ -406,7 +406,7 @@ void TupleUnion::normalize(const Row &in, Row *out) ostringstream os; if (in.getScale(i)) { double d = in.getIntField(i); - d /= pow10(in.getScale(i)); + d /= exp10(in.getScale(i)); os.precision(15); os << d; } @@ -488,7 +488,7 @@ dec1: uint64_t val = in.getIntField(i); ostringstream os; if (in.getScale(i)) { double d = in.getUintField(i); - d /= pow10(in.getScale(i)); + d /= exp10(in.getScale(i)); os.precision(15); os << d; } diff --git a/dbcon/mysql/ha_calpont_ddl.cpp b/dbcon/mysql/ha_calpont_ddl.cpp index 611f1da3b..9e757e426 100755 --- a/dbcon/mysql/ha_calpont_ddl.cpp +++ b/dbcon/mysql/ha_calpont_ddl.cpp @@ -2039,51 +2039,53 @@ int ha_calpont_impl_delete_table_(const char *db, const char *name, cal_connecti int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connection_info& ci) { - THD *thd = current_thd; - string emsg; + THD* thd = current_thd; + string emsg; - ostringstream stmt1; - pair fromPair; - pair toPair; - string stmt; + pair fromPair; + pair toPair; + string stmt; - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) - return 0; + //this is replcated DDL, treat it just like SSO + if (thd->slave_thread) + return 0; - //@bug 5660. Error out REAL DDL/DML on slave node. - // When the statement gets here, it's NOT SSO or RESTRICT - if (ci.isSlaveNode) - { - string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE); - setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg); - return 1; - } + //@bug 5660. Error out REAL DDL/DML on slave node. + // When the statement gets here, it's NOT SSO or RESTRICT + if (ci.isSlaveNode) + { + string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE); + setError(current_thd, ER_CHECK_NOT_IMPLEMENTED, emsg); + return 1; + } - fromPair = parseTableName(from); - toPair = parseTableName(to); + fromPair = parseTableName(from); + toPair = parseTableName(to); - if (fromPair.first != toPair.first) - { - thd->get_stmt_da()->set_overwrite_status(true); - thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "Both tables must be in the same database to use RENAME TABLE"); - return -1; - } + if (fromPair.first != toPair.first) + { + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "Both tables must be in the same database to use RENAME TABLE"); + return -1; + } - stmt1 << "alter table " << fromPair.second << " rename to " << toPair.second << ";"; + // This explicitely shields both db objects with quotes that the lexer strips down later. + stmt = "alter table `" + fromPair.second + "` rename to `" + toPair.second + "`;"; + string db; - stmt = stmt1.str(); - string db; - if ( fromPair.first.length() !=0 ) - db = fromPair.first; - else if ( thd->db ) - db = thd->db; + if ( thd->db ) + db = thd->db; + else if ( fromPair.first.length() != 0 ) + db = fromPair.first; + else + db = toPair.first; - int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg); - if (rc != 0) - push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str()); + int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg); - return rc; + if (rc != 0) + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str()); + + return rc; } @@ -2121,7 +2123,7 @@ long long calonlinealter(UDF_INIT* initid, UDF_ARGS* args, int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg, compressiontype); if (rc != 0) - push_warning(thd, Sql_condition::WARN_LEVEL_ERROR, 9999, emsg.c_str()); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str()); return rc; } diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 0f54d8bdb..a30d71688 100755 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -801,8 +801,8 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex) // View is already processed in view::transform // @bug5319. view is sometimes treated as derived table and // fromSub::transform does not build outer join filters. - //if (!table_ptr->derived && table_ptr->view) - // continue; + if (!table_ptr->derived && table_ptr->view) + continue; CalpontSystemCatalog:: TableAliasName tan = make_aliasview( (table_ptr->db ? table_ptr->db : ""), @@ -4099,7 +4099,6 @@ void gp_walk(const Item *item, void *arg) Item_string* isp = (Item_string*)item; if (isp) { - // @bug 3669. trim trailing spaces for the compare value if (isp->result_type() == STRING_RESULT) { String val, *str = isp->val_str(&val); @@ -4108,9 +4107,6 @@ void gp_walk(const Item *item, void *arg) { cval.assign(str->ptr(), str->length()); } - size_t spos = cval.find_last_not_of(" "); - if (spos != string::npos) - cval = cval.substr(0, spos+1); gwip->rcWorkStack.push(new ConstantColumn(cval)); break; } diff --git a/dbcon/mysql/is_columnstore_columns.cpp b/dbcon/mysql/is_columnstore_columns.cpp index 21c9e748e..b446eed4e 100644 --- a/dbcon/mysql/is_columnstore_columns.cpp +++ b/dbcon/mysql/is_columnstore_columns.cpp @@ -27,6 +27,8 @@ #include #include "calpontsystemcatalog.h" #include "dataconvert.h" +#include "exceptclasses.h" +using namespace logging; // Required declaration as it isn't in a MairaDB include @@ -70,7 +72,22 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { - execplan::CalpontSystemCatalog::RIDList column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true); + execplan::CalpontSystemCatalog::RIDList column_rid_list; + // Note a table may get dropped as you iterate over the list of tables. + // So simply ignore the dropped table. + try { + column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true); + } + catch (IDBExcept& ex) + { + if (ex.errorCode() == ERR_TABLE_NOT_IN_CATALOG) { + continue; + } + else { + return 1; + } + } + for (size_t col_num = 0; col_num < column_rid_list.size(); col_num++) { execplan::CalpontSystemCatalog::TableColName tcn = systemCatalogPtr->colName(column_rid_list[col_num].objnum); diff --git a/oam/cloud/MCSVolumeCmds.sh b/oam/cloud/MCSVolumeCmds.sh index 291d27e44..c7a231261 100755 --- a/oam/cloud/MCSVolumeCmds.sh +++ b/oam/cloud/MCSVolumeCmds.sh @@ -202,7 +202,7 @@ detachvolume() { checkInfostatus if [ $STATUS == "detaching" ]; then retries=1 - while [ $retries -ne 60 ]; do + while [ $retries -ne 10 ]; do #retry until it's attached $AWSCLI detach-volume --volume-id $volumeName --region $Region > /tmp/volumeInfo_$volumeName 2>&1 @@ -239,7 +239,7 @@ attachvolume() { checkInfostatus if [ $STATUS == "attaching" -o $STATUS == "already-attached" ]; then retries=1 - while [ $retries -ne 60 ]; do + while [ $retries -ne 10 ]; do #check status until it's attached describevolume if [ $STATUS == "attached" ]; then diff --git a/oam/install_scripts/syslogSetup.sh b/oam/install_scripts/syslogSetup.sh index 1f4235a30..78b292dac 100755 --- a/oam/install_scripts/syslogSetup.sh +++ b/oam/install_scripts/syslogSetup.sh @@ -13,6 +13,7 @@ syslog_conf=nofile rsyslog7=0 user=`whoami 2>/dev/null` +group=user SUDO=" " if [ "$user" != "root" ]; then @@ -167,9 +168,24 @@ if [ ! -z "$syslog_conf" ] ; then # remove older version incase it was installed by previous build $SUDO rm -rf /etc/rsyslog.d/columnstore.conf + // determine username/groupname + + if [ -f /var/log/messages ]; then + user=`stat -c "%U %G" /var/log/messages | awk '{print $1}'` + group=`stat -c "%U %G" /var/log/messages | awk '{print $2}'` + fi + + if [ -f /var/log/syslog ]; then + user=`stat -c "%U %G" /var/log/syslog | awk '{print $1}'` + group=`stat -c "%U %G" /var/log/syslog | awk '{print $2}'` + fi + + //set permissions + $SUDO chown $user:$group -R /var/log/mariadb > /dev/null 2>&1 + if [ $rsyslog7 == 1 ]; then - sed -i -e s/groupname/adm/g ${columnstoreSyslogFile7} - sed -i -e s/username/syslog/g ${columnstoreSyslogFile7} + sed -i -e s/groupname/$group/g ${columnstoreSyslogFile7} + sed -i -e s/username/$user/g ${columnstoreSyslogFile7} $SUDO rm -f /etc/rsyslog.d/49-columnstore.conf $SUDO cp ${columnstoreSyslogFile7} ${syslog_conf} diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 649d86f13..9a405e978 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -5477,6 +5477,35 @@ namespace oam exceptionControl("autoMovePmDbroot", API_INVALID_PARAMETER); } + //detach first to make sure DBS can be detach before trying to move to another pm + DBRootConfigList::iterator pt3 = residedbrootConfigList.begin(); + for( ; pt3 != residedbrootConfigList.end() ; pt3++ ) + { + int dbrootID = *pt3; + + try + { + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonDetach(dbrootlist); + } + catch (exception& ) + { + writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR ); + + //reattach + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonAttach(residePM, dbrootlist); + + exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE); + } + } + //get dbroot id for other PMs systemStorageInfo_t t; DeviceDBRootList moduledbrootlist; @@ -5951,9 +5980,8 @@ namespace oam } if (!found) { - writeLog("ERROR: no dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_ERROR ); - cout << "ERROR: no dbroots found in " << fileName << endl; - exceptionControl("autoUnMovePmDbroot", API_FAILURE); + writeLog("No dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_DEBUG ); + cout << "No dbroots found in " << fileName << endl; } oldFile.close(); @@ -7248,7 +7276,7 @@ namespace oam else return; - // check if mysql-Capont is installed + // check if mysql-Columnstore is installed string mysqlscript = InstallDir + "/mysql/mysql-Columnstore"; if (access(mysqlscript.c_str(), X_OK) != 0) return; @@ -9644,6 +9672,146 @@ namespace oam } /*************************************************************************** + * + * Function: amazonDetach + * + * Purpose: Amazon EC2 volume deattach needed + * + ****************************************************************************/ + + void Oam::amazonDetach(dbrootList dbrootConfigList) + { + //if amazon cloud with external volumes, do the detach/attach moves + string cloud; + string DBRootStorageType; + try { + getSystemConfig("Cloud", cloud); + getSystemConfig("DBRootStorageType", DBRootStorageType); + } + catch(...) {} + + if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && + DBRootStorageType == "external" ) + { + writeLog("amazonDetach function started ", LOG_TYPE_DEBUG ); + + dbrootList::iterator pt3 = dbrootConfigList.begin(); + for( ; pt3 != dbrootConfigList.end() ; pt3++) + { + string dbrootid = *pt3; + string volumeNameID = "PMVolumeName" + dbrootid; + string volumeName = oam::UnassignedName; + string deviceNameID = "PMVolumeDeviceName" + dbrootid; + string deviceName = oam::UnassignedName; + try { + getSystemConfig( volumeNameID, volumeName); + getSystemConfig( deviceNameID, deviceName); + } + catch(...) + {} + + if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName ) + { + cout << " ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName << endl; + writeLog("ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR ); + exceptionControl("amazonDetach", API_INVALID_PARAMETER); + } + + //send msg to to-pm to umount volume + int returnStatus = sendMsgToProcMgr(UNMOUNT, dbrootid, FORCEFUL, ACK_YES); + if (returnStatus != API_SUCCESS) { + writeLog("ERROR: amazonDetach, umount failed on " + dbrootid, LOG_TYPE_ERROR ); + } + + if (!detachEC2Volume(volumeName)) { + cout << " ERROR: amazonDetach, detachEC2Volume failed on " + volumeName << endl; + writeLog("ERROR: amazonDetach, detachEC2Volume failed on " + volumeName , LOG_TYPE_ERROR ); + exceptionControl("amazonDetach", API_FAILURE); + } + + writeLog("amazonDetach, detachEC2Volume passed on " + volumeName , LOG_TYPE_DEBUG ); + } + } + } + + /*************************************************************************** + * + * Function: amazonAttach + * + * Purpose: Amazon EC2 volume Attach needed + * + ****************************************************************************/ + + void Oam::amazonAttach(std::string toPM, dbrootList dbrootConfigList) + { + //if amazon cloud with external volumes, do the detach/attach moves + string cloud; + string DBRootStorageType; + try { + getSystemConfig("Cloud", cloud); + getSystemConfig("DBRootStorageType", DBRootStorageType); + } + catch(...) {} + + if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && + DBRootStorageType == "external" ) + { + writeLog("amazonAttach function started ", LOG_TYPE_DEBUG ); + + //get Instance Name for to-pm + string toInstanceName = oam::UnassignedName; + try + { + ModuleConfig moduleconfig; + getSystemConfig(toPM, moduleconfig); + HostConfigList::iterator pt1 = moduleconfig.hostConfigList.begin(); + toInstanceName = (*pt1).HostName; + } + catch(...) + {} + + if ( toInstanceName == oam::UnassignedName || toInstanceName.empty() ) + { + cout << " ERROR: amazonAttach, invalid Instance Name for " << toPM << endl; + writeLog("ERROR: amazonAttach, invalid Instance Name " + toPM, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_INVALID_PARAMETER); + } + + dbrootList::iterator pt3 = dbrootConfigList.begin(); + for( ; pt3 != dbrootConfigList.end() ; pt3++) + { + string dbrootid = *pt3; + string volumeNameID = "PMVolumeName" + dbrootid; + string volumeName = oam::UnassignedName; + string deviceNameID = "PMVolumeDeviceName" + dbrootid; + string deviceName = oam::UnassignedName; + try { + getSystemConfig( volumeNameID, volumeName); + getSystemConfig( deviceNameID, deviceName); + } + catch(...) + {} + + if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName ) + { + cout << " ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName << endl; + writeLog("ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_INVALID_PARAMETER); + } + + if (!attachEC2Volume(volumeName, deviceName, toInstanceName)) { + cout << " ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName << endl; + writeLog("ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_FAILURE); + } + + writeLog("amazonAttach, attachEC2Volume passed on " + volumeName + ":" + toPM, LOG_TYPE_DEBUG ); + } + } + } + + + /*************************************************************************** * * Function: amazonReattach * @@ -9736,6 +9904,7 @@ namespace oam } } + /*************************************************************************** * * Function: mountDBRoot diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index 51c1f773c..e5011407c 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -229,6 +229,7 @@ namespace oam API_CONN_REFUSED, API_CANCELLED, API_STILL_WORKING, + API_DETACH_FAILURE, API_MAX }; @@ -2432,6 +2433,8 @@ namespace oam void amazonReattach(std::string toPM, dbrootList dbrootConfigList, bool attach = false); void mountDBRoot(dbrootList dbrootConfigList, bool mount = true); + void amazonDetach(dbrootList dbrootConfigList); + void amazonAttach(std::string toPM, dbrootList dbrootConfigList); /** *@brief gluster control diff --git a/procmgr/main.cpp b/procmgr/main.cpp index 118ac0d73..2747fda16 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1553,7 +1553,7 @@ void pingDeviceThread() processManager.restartProcessType("WriteEngineServer", moduleName); //set module to enable state - processManager.enableModule(moduleName, oam::AUTO_OFFLINE); + processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true); downActiveOAMModule = false; int retry; @@ -1647,7 +1647,7 @@ void pingDeviceThread() } else //set module to enable state - processManager.enableModule(moduleName, oam::AUTO_OFFLINE); + processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true); //restart module processes int retry = 0; @@ -1922,7 +1922,7 @@ void pingDeviceThread() if ( PrimaryUMModuleName == moduleName ) downPrimaryUM = true; - // if not disabled and amazon, skip + // if disabled, skip if (opState != oam::AUTO_DISABLED ) { //Log failure, issue alarm, set moduleOpState @@ -1968,6 +1968,7 @@ void pingDeviceThread() if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) || ( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) || ( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) { + string error; try { log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG); oam.autoMovePmDbroot(moduleName); @@ -1984,6 +1985,23 @@ void pingDeviceThread() { log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR); } + + if ( error == oam.itoa(oam::API_DETACH_FAILURE) ) + { + processManager.setModuleState(moduleName, oam::AUTO_DISABLED); + + // resume the dbrm + oam.dbrmctl("resume"); + log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); + + //enable query stats + dbrm.setSystemQueryReady(true); + + //set query system state ready + processManager.setQuerySystemState(true); + + break; + } } } diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 36893e050..8b01179d2 100755 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -3438,7 +3438,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) restartProcessType("ExeMgr"); sleep(1); - restartProcessType("mysql"); + restartProcessType("mysqld"); restartProcessType("WriteEngineServer"); sleep(1); @@ -3457,7 +3457,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) * purpose: Clear the Disable State on a specified module * ******************************************************************************************/ -int ProcessManager::enableModule(string target, int state) +int ProcessManager::enableModule(string target, int state, bool failover) { Oam oam; ModuleConfig moduleconfig; @@ -3496,7 +3496,8 @@ int ProcessManager::enableModule(string target, int state) setStandbyModule(newStandbyModule); //set recycle process - recycleProcess(target); + if (!failover) + recycleProcess(target); log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG); @@ -4256,7 +4257,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski PMwithUM = "n"; } - // If mysql is the processName, then send to modules were ExeMgr is running + // If mysqld is the processName, then send to modules were ExeMgr is running try { oam.getProcessStatus(systemprocessstatus); @@ -4267,7 +4268,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski if ( systemprocessstatus.processstatus[i].Module == skipModule ) continue; - if ( processName == "mysql" ) { + if ( processName == "mysqld" ) { if ( systemprocessstatus.processstatus[i].ProcessName == "ExeMgr") { ProcessStatus procstat; oam.getProcessStatus("mysqld", systemprocessstatus.processstatus[i].Module, procstat); @@ -8985,7 +8986,7 @@ int ProcessManager::OAMParentModuleChange() if (systemstatus.SystemOpState == ACTIVE) { log.writeLog(__LINE__, "System Active, restart needed processes", LOG_TYPE_DEBUG); - processManager.restartProcessType("mysql"); + processManager.restartProcessType("mysqld"); processManager.restartProcessType("ExeMgr"); processManager.restartProcessType("WriteEngineServer"); processManager.reinitProcessType("DBRMWorkerNode"); @@ -10099,7 +10100,7 @@ void ProcessManager::stopProcessTypes(bool manualFlag) log.writeLog(__LINE__, "stopProcessTypes Called"); //front-end first - processManager.stopProcessType("mysql", manualFlag); + processManager.stopProcessType("mysqld", manualFlag); processManager.stopProcessType("DMLProc", manualFlag); processManager.stopProcessType("DDLProc", manualFlag); processManager.stopProcessType("ExeMgr", manualFlag); diff --git a/procmgr/processmanager.h b/procmgr/processmanager.h index 55dad53cb..863ad9121 100644 --- a/procmgr/processmanager.h +++ b/procmgr/processmanager.h @@ -307,7 +307,7 @@ public: /** *@brief Enable a specified module */ - int enableModule(std::string target, int state); + int enableModule(std::string target, int state, bool failover = false); /** *@brief Enable a specified module diff --git a/procmon/main.cpp b/procmon/main.cpp index b010b3d74..b4e23a6e1 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -695,11 +695,11 @@ int main(int argc, char **argv) if ( ret != 0 ) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); - //mysql status monitor thread - if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM ) || - (PMwithUM == "y") ) + //mysqld status monitor thread + if ( config.moduleType() == "um" || + ( config.moduleType() == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || + ( config.moduleType() == "pm" && PMwithUM == "y") ) { - pthread_t mysqlThread; ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*)) &mysqlMonitorThread, NULL); if ( ret != 0 ) @@ -1127,7 +1127,7 @@ static void mysqlMonitorThread(MonitorConfig config) catch(...) {} - sleep(10); + sleep(5); } } diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index aa10f2666..91f78e640 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -457,7 +457,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "MSG RECEIVED: Stop process request on " + processName); int requestStatus = API_SUCCESS; - // check for mysql + // check for mysqld if ( processName == "mysqld" ) { try { oam.actionMysqlCalpont(MYSQL_STOP); @@ -520,7 +520,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO msg >> manualFlag; log.writeLog(__LINE__, "MSG RECEIVED: Start process request on: " + processName); - // check for mysql + // check for mysqld if ( processName == "mysqld" ) { try { oam.actionMysqlCalpont(MYSQL_START); @@ -640,7 +640,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "MSG RECEIVED: Restart process request on " + processName); int requestStatus = API_SUCCESS; - // check for mysql restart + // check for mysqld restart if ( processName == "mysqld" ) { try { oam.actionMysqlCalpont(MYSQL_RESTART); @@ -869,7 +869,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "Error running DBRM clearShm", LOG_TYPE_ERROR); } - //stop the mysql daemon + //stop the mysqld daemon try { oam.actionMysqlCalpont(MYSQL_STOP); log.writeLog(__LINE__, "Stop MySQL Process", LOG_TYPE_DEBUG); @@ -995,12 +995,12 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO system(cmd.c_str()); - //start the mysql daemon + //start the mysqld daemon try { oam.actionMysqlCalpont(MYSQL_START); } catch(...) - { // mysql didn't start, return with error + { // mysqld didn't start, return with error log.writeLog(__LINE__, "STARTALL: MySQL failed to start, start-module failure", LOG_TYPE_CRITICAL); ackMsg << (ByteStream::byte) ACK; @@ -1265,7 +1265,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO //send down notification oam.sendDeviceNotification(config.moduleName(), MODULE_DOWN); - //stop the mysql daemon and then columnstore + //stop the mysqld daemon and then columnstore try { oam.actionMysqlCalpont(MYSQL_STOP); } @@ -1444,7 +1444,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO } } - // install mysql rpms if being reconfigured as a um + // install mysqld rpms if being reconfigured as a um if ( reconfigureModuleName.find("um") != string::npos ) { string cmd = startup::StartUp::installDir() + "/bin/post-mysqld-install >> /tmp/rpminstall"; system(cmd.c_str()); diff --git a/utils/common/any.hpp b/utils/common/any.hpp index 5265015f1..be0ca679b 100755 --- a/utils/common/any.hpp +++ b/utils/common/any.hpp @@ -54,15 +54,25 @@ namespace anyimpl template struct big_any_policy : typed_base_any_policy { - virtual void static_delete(void** x) { if (*x) - delete(*reinterpret_cast(x)); *x = NULL; } - virtual void copy_from_value(void const* src, void** dest) { - *dest = new T(*reinterpret_cast(src)); } - virtual void clone(void* const* src, void** dest) { - *dest = new T(**reinterpret_cast(src)); } - virtual void move(void* const* src, void** dest) { - (*reinterpret_cast(dest))->~T(); - **reinterpret_cast(dest) = **reinterpret_cast(src); } + virtual void static_delete(void** x) + { + if (*x) + delete(*reinterpret_cast(x)); + *x = NULL; + } + virtual void copy_from_value(void const* src, void** dest) + { + *dest = new T(*reinterpret_cast(src)); + } + virtual void clone(void* const* src, void** dest) + { + *dest = new T(**reinterpret_cast(src)); + } + virtual void move(void* const* src, void** dest) + { + (*reinterpret_cast(dest))->~T(); + **reinterpret_cast(dest) = **reinterpret_cast(src); + } virtual void* get_value(void** src) { return *src; } }; diff --git a/utils/common/cgroupconfigurator.cpp b/utils/common/cgroupconfigurator.cpp index a4a67d68e..fcf7ef0e9 100644 --- a/utils/common/cgroupconfigurator.cpp +++ b/utils/common/cgroupconfigurator.cpp @@ -19,6 +19,7 @@ #include "configcpp.h" #include "logger.h" #include +#include #include #ifdef _MSC_VER #include "unistd.h" diff --git a/utils/funcexp/func_substring_index.cpp b/utils/funcexp/func_substring_index.cpp index e3ec80b5f..a4b9fb9a6 100644 --- a/utils/funcexp/func_substring_index.cpp +++ b/utils/funcexp/func_substring_index.cpp @@ -71,6 +71,9 @@ std::string Func_substring_index::getStrVal(rowgroup::Row& row, if ( count > (int64_t) end ) return str; + if (( count < 0 ) && ((count * -1) > end)) + return str; + string value = str; if ( count > 0 ) { diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index 48bdd7031..ba64e3596 100755 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -79,10 +79,10 @@ StringStore::~StringStore() #endif } -uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) +uint64_t StringStore::storeString(const uint8_t *data, uint32_t len) { MemChunk *lastMC = NULL; - uint32_t ret = 0; + uint64_t ret = 0; empty = false; // At least a NULL is being stored. @@ -92,7 +92,7 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) if ((len == 8 || len == 9) && *((uint64_t *) data) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str())) - return numeric_limits::max(); + return numeric_limits::max(); //@bug6065, make StringStore::storeString() thread safe boost::mutex::scoped_lock lk(fMutex, defer_lock); @@ -102,20 +102,21 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) if (mem.size() > 0) lastMC = (MemChunk *) mem.back().get(); - if (len >= CHUNK_SIZE) + if ((len+4) >= CHUNK_SIZE) { - shared_array newOne(new uint8_t[len + sizeof(MemChunk)]); + shared_array newOne(new uint8_t[len + sizeof(MemChunk) + 4]); longStrings.push_back(newOne); lastMC = (MemChunk*) longStrings.back().get(); - lastMC->capacity = lastMC->currentSize = len; - memcpy(lastMC->data, data, len); + lastMC->capacity = lastMC->currentSize = len + 4; + memcpy(lastMC->data, &len, 4); + memcpy(lastMC->data + 4, data, len); // High bit to mark a long string - ret = 0x80000000; + ret = 0x8000000000000000; ret += longStrings.size() - 1; } else { - if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < len)) + if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < (len + 4))) { // mem usage debugging //if (lastMC) @@ -130,7 +131,11 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) ret = ((mem.size()-1) * CHUNK_SIZE) + lastMC->currentSize; - memcpy(&(lastMC->data[lastMC->currentSize]), data, len); + // If this ever happens then we have big problems + if (ret & 0x8000000000000000) + throw logic_error("StringStore memory exceeded."); + memcpy(&(lastMC->data[lastMC->currentSize]), &len, 4); + memcpy(&(lastMC->data[lastMC->currentSize]) + 4, data, len); /* cout << "stored: '" << hex; for (uint32_t i = 0; i < len ; i++) { @@ -138,7 +143,7 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) } cout << "' at position " << lastMC->currentSize << " len " << len << dec << endl; */ - lastMC->currentSize += len; + lastMC->currentSize += len + 4; } return ret; @@ -146,31 +151,31 @@ uint32_t StringStore::storeString(const uint8_t *data, uint32_t len) void StringStore::serialize(ByteStream &bs) const { - uint32_t i; + uint64_t i; MemChunk *mc; - bs << (uint32_t) mem.size(); + bs << (uint64_t) mem.size(); bs << (uint8_t) empty; for (i = 0; i < mem.size(); i++) { mc = (MemChunk *) mem[i].get(); - bs << (uint32_t) mc->currentSize; + bs << (uint64_t) mc->currentSize; //cout << "serialized " << mc->currentSize << " bytes\n"; bs.append(mc->data, mc->currentSize); } - bs << (uint32_t) longStrings.size(); + bs << (uint64_t) longStrings.size(); for (i = 0; i < longStrings.size(); i++) { mc = (MemChunk *) longStrings[i].get(); - bs << (uint32_t) mc->currentSize; + bs << (uint64_t) mc->currentSize; bs.append(mc->data, mc->currentSize); } } void StringStore::deserialize(ByteStream &bs) { - uint32_t i; - uint32_t count; - uint32_t size; + uint64_t i; + uint64_t count; + uint64_t size; uint8_t *buf; MemChunk *mc; uint8_t tmp8; @@ -718,10 +723,9 @@ bool Row::isNullValue(uint32_t colIndex) const case CalpontSystemCatalog::STRINT: { uint32_t len = getColumnWidth(colIndex); if (inStringTable(colIndex)) { - uint32_t offset, length; - offset = *((uint32_t *) &data[offsets[colIndex]]); - length = *((uint32_t *) &data[offsets[colIndex] + 4]); - return strings->isNullValue(offset, length); + uint64_t offset; + offset = *((uint64_t *) &data[offsets[colIndex]]); + return strings->isNullValue(offset); } if (data[offsets[colIndex]] == 0) // empty string return true; @@ -757,10 +761,9 @@ bool Row::isNullValue(uint32_t colIndex) const case CalpontSystemCatalog::VARBINARY: { uint32_t pos = offsets[colIndex]; if (inStringTable(colIndex)) { - uint32_t offset, length; - offset = *((uint32_t *) &data[pos]); - length = *((uint32_t *) &data[pos+4]); - return strings->isNullValue(offset, length); + uint64_t offset; + offset = *((uint64_t *) &data[pos]); + return strings->isNullValue(offset); } if (*((uint16_t*) &data[pos]) == 0) return true; @@ -1416,8 +1419,8 @@ RGData RowGroup::duplicate() void Row::setStringField(const std::string &val, uint32_t colIndex) { - uint32_t length; - uint32_t offset; + uint64_t offset; + uint64_t length; //length = strlen(val.c_str()) + 1; length = val.length(); @@ -1426,8 +1429,7 @@ void Row::setStringField(const std::string &val, uint32_t colIndex) if (inStringTable(colIndex)) { offset = strings->storeString((const uint8_t *) val.data(), length); - *((uint32_t *) &data[offsets[colIndex]]) = offset; - *((uint32_t *) &data[offsets[colIndex] + 4]) = length; + *((uint64_t *) &data[offsets[colIndex]]) = offset; // cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]]) // << " length " << *((uint32_t *) &data[offsets[colIndex] + 4]) // << endl; diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 8b5ea75d7..7aca0c93f 100755 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -92,13 +92,14 @@ public: StringStore(); virtual ~StringStore(); - inline std::string getString(uint32_t offset, uint32_t length) const; - uint32_t storeString(const uint8_t *data, uint32_t length); //returns the offset - inline const uint8_t * getPointer(uint32_t offset) const; + inline std::string getString(uint64_t offset) const; + uint64_t storeString(const uint8_t *data, uint32_t length); //returns the offset + inline const uint8_t * getPointer(uint64_t offset) const; + inline uint32_t getStringLength(uint64_t offset); inline bool isEmpty() const; inline uint64_t getSize() const; - inline bool isNullValue(uint32_t offset, uint32_t length) const; - inline bool equals(const std::string &str, uint32_t offset, uint32_t length) const; + inline bool isNullValue(uint64_t offset) const; + inline bool equals(const std::string &str, uint64_t offset) const; void clear(); @@ -541,9 +542,8 @@ inline bool Row::equals(uint64_t val, uint32_t colIndex) const inline bool Row::equals(const std::string &val, uint32_t colIndex) const { if (inStringTable(colIndex)) { - uint32_t offset = *((uint32_t *) &data[offsets[colIndex]]); - uint32_t length = *((uint32_t *) &data[offsets[colIndex] + 4]); - return strings->equals(val, offset, length); + uint64_t offset = *((uint64_t *) &data[offsets[colIndex]]); + return strings->equals(val, offset); } else return (strncmp(val.c_str(), (char *) &data[offsets[colIndex]], getColumnWidth(colIndex)) == 0); @@ -609,28 +609,27 @@ inline int64_t Row::getIntField(uint32_t colIndex) const inline const uint8_t * Row::getStringPointer(uint32_t colIndex) const { if (inStringTable(colIndex)) - return strings->getPointer(*((uint32_t *) &data[offsets[colIndex]])); + return strings->getPointer(*((uint64_t *) &data[offsets[colIndex]])); return &data[offsets[colIndex]]; } inline uint32_t Row::getStringLength(uint32_t colIndex) const { if (inStringTable(colIndex)) - return *((uint32_t *) &data[offsets[colIndex] + 4]); + return strings->getStringLength(*((uint64_t *) &data[offsets[colIndex]])); return strnlen((char *) &data[offsets[colIndex]], getColumnWidth(colIndex)); } inline void Row::setStringField(const uint8_t *strdata, uint32_t length, uint32_t colIndex) { - uint32_t offset; + uint64_t offset; if (length > getColumnWidth(colIndex)) length = getColumnWidth(colIndex); if (inStringTable(colIndex)) { offset = strings->storeString(strdata, length); - *((uint32_t *) &data[offsets[colIndex]]) = offset; - *((uint32_t *) &data[offsets[colIndex] + 4]) = length; + *((uint64_t *) &data[offsets[colIndex]]) = offset; // cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]]) // << " length " << *((uint32_t *) &data[offsets[colIndex] + 4]) // << endl; @@ -645,8 +644,7 @@ inline void Row::setStringField(const uint8_t *strdata, uint32_t length, uint32_ inline std::string Row::getStringField(uint32_t colIndex) const { if (inStringTable(colIndex)) - return strings->getString(*((uint32_t *) &data[offsets[colIndex]]), - *((uint32_t *) &data[offsets[colIndex] + 4])); + return strings->getString(*((uint64_t *) &data[offsets[colIndex]])); // Not all CHAR/VARCHAR are NUL terminated so use length return std::string((char *) &data[offsets[colIndex]], strnlen((char *) &data[offsets[colIndex]], getColumnWidth(colIndex))); @@ -662,21 +660,21 @@ inline std::string Row::getVarBinaryStringField(uint32_t colIndex) const inline uint32_t Row::getVarBinaryLength(uint32_t colIndex) const { if (inStringTable(colIndex)) - return *((uint32_t *) &data[offsets[colIndex] + 4]); + return strings->getStringLength(*((uint64_t *) &data[offsets[colIndex]]));; return *((uint16_t*) &data[offsets[colIndex]]); } inline const uint8_t* Row::getVarBinaryField(uint32_t colIndex) const { if (inStringTable(colIndex)) - return strings->getPointer(*((uint32_t *) &data[offsets[colIndex]])); + return strings->getPointer(*((uint64_t *) &data[offsets[colIndex]])); return &data[offsets[colIndex] + 2]; } inline const uint8_t* Row::getVarBinaryField(uint32_t& len, uint32_t colIndex) const { if (inStringTable(colIndex)) { - len = *((uint32_t *) &data[offsets[colIndex] + 4]); + len = strings->getStringLength(*((uint64_t *) &data[offsets[colIndex]])); return getVarBinaryField(colIndex); } else { @@ -854,9 +852,8 @@ inline void Row::setVarBinaryField(const uint8_t *val, uint32_t len, uint32_t co if (len > getColumnWidth(colIndex)) len = getColumnWidth(colIndex); if (inStringTable(colIndex)) { - uint32_t offset = strings->storeString(val, len); - *((uint32_t *) &data[offsets[colIndex]]) = offset; - *((uint32_t *) &data[offsets[colIndex] + 4]) = len; + uint64_t offset = strings->storeString(val, len); + *((uint64_t *) &data[offsets[colIndex]]) = offset; } else { *((uint16_t*) &data[offsets[colIndex]]) = len; @@ -1535,49 +1532,53 @@ inline void copyRow(const Row &in, Row *out) copyRow(in, out, std::min(in.getColumnCount(), out->getColumnCount())); } -inline std::string StringStore::getString(uint32_t off, uint32_t len) const +inline std::string StringStore::getString(uint64_t off) const { - if (off == std::numeric_limits::max()) + uint32_t length; + if (off == std::numeric_limits::max()) return joblist::CPNULLSTRMARK; MemChunk *mc; - if (off & 0x80000000) + if (off & 0x8000000000000000) { - off = off - 0x80000000; + off = off - 0x8000000000000000; if (longStrings.size() <= off) return joblist::CPNULLSTRMARK; mc = (MemChunk*) longStrings[off].get(); - return std::string((char *) mc->data, len); + memcpy(&length, mc->data, 4); + return std::string((char *) mc->data+4, length); } - uint32_t chunk = off / CHUNK_SIZE; - uint32_t offset = off % CHUNK_SIZE; + uint64_t chunk = off / CHUNK_SIZE; + uint64_t offset = off % CHUNK_SIZE; // this has to handle uninitialized data as well. If it's uninitialized it doesn't matter // what gets returned, it just can't go out of bounds. if (mem.size() <= chunk) return joblist::CPNULLSTRMARK; mc = (MemChunk *) mem[chunk].get(); - if ((offset + len) > mc->currentSize) + + memcpy(&length, &mc->data[offset], 4); + if ((offset + length) > mc->currentSize) return joblist::CPNULLSTRMARK; - - return std::string((char *) &(mc->data[offset]), len); + + return std::string((char *) &(mc->data[offset])+4, length); } -inline const uint8_t * StringStore::getPointer(uint32_t off) const +inline const uint8_t * StringStore::getPointer(uint64_t off) const { - if (off == std::numeric_limits::max()) + if (off == std::numeric_limits::max()) return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); - uint32_t chunk = off / CHUNK_SIZE; - uint32_t offset = off % CHUNK_SIZE; + uint64_t chunk = off / CHUNK_SIZE; + uint64_t offset = off % CHUNK_SIZE; MemChunk *mc; - if (off & 0x80000000) + if (off & 0x8000000000000000) { - off = off - 0x80000000; + off = off - 0x8000000000000000; if (longStrings.size() <= off) return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); mc = (MemChunk*) longStrings[off].get(); - return mc->data; + return mc->data+4; } // this has to handle uninitialized data as well. If it's uninitialized it doesn't matter // what gets returned, it just can't go out of bounds. @@ -1587,19 +1588,17 @@ inline const uint8_t * StringStore::getPointer(uint32_t off) const if (offset > mc->currentSize) return (const uint8_t *) joblist::CPNULLSTRMARK.c_str(); - return &(mc->data[offset]); + return &(mc->data[offset]) + 4; } -inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const +inline bool StringStore::isNullValue(uint64_t off) const { - if (off == std::numeric_limits::max() || len == 0) + uint32_t length; + if (off == std::numeric_limits::max()) return true; - if (len < 8) - return false; - // Long strings won't be NULL - if (off & 0x80000000) + if (off & 0x8000000000000000) return false; uint32_t chunk = off / CHUNK_SIZE; @@ -1609,31 +1608,38 @@ inline bool StringStore::isNullValue(uint32_t off, uint32_t len) const return true; mc = (MemChunk *) mem[chunk].get(); - if ((offset + len) > mc->currentSize) + memcpy(&length, &mc->data[offset], 4); + if (length == 0) return true; - if (mc->data[offset] == 0) // "" = NULL string for some reason... + if (length < 8) + return false; + if ((offset + length) > mc->currentSize) return true; - return (*((uint64_t *) &mc->data[offset]) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str())); + if (mc->data[offset+4] == 0) // "" = NULL string for some reason... + return true; + return (*((uint64_t *) &mc->data[offset]+4) == *((uint64_t *) joblist::CPNULLSTRMARK.c_str())); } -inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t len) const +inline bool StringStore::equals(const std::string &str, uint64_t off) const { - if (off == std::numeric_limits::max() || len == 0) + uint32_t length; + if (off == std::numeric_limits::max()) return str == joblist::CPNULLSTRMARK; MemChunk *mc; - if (off & 0x80000000) + if (off & 0x8000000000000000) { - if (longStrings.size() <= (off - 0x80000000)) + if (longStrings.size() <= (off - 0x8000000000000000)) return false; - mc = (MemChunk *) longStrings[off - 0x80000000].get(); + mc = (MemChunk *) longStrings[off - 0x8000000000000000].get(); + memcpy(&length, mc->data, 4); // Not sure if this check it needed, but adds safety - if (len > mc->currentSize) + if (length > mc->currentSize) return false; - return (strncmp(str.c_str(), (const char*) mc->data, len) == 0); + return (strncmp(str.c_str(), (const char*) mc->data+4, length) == 0); } uint32_t chunk = off / CHUNK_SIZE; uint32_t offset = off % CHUNK_SIZE; @@ -1641,10 +1647,37 @@ inline bool StringStore::equals(const std::string &str, uint32_t off, uint32_t l return false; mc = (MemChunk *) mem[chunk].get(); - if ((offset + len) > mc->currentSize) + memcpy(&length, &mc->data[offset], 4); + if ((offset + length) > mc->currentSize) return false; - return (strncmp(str.c_str(), (const char *) &mc->data[offset], len) == 0); + return (strncmp(str.c_str(), (const char *) &mc->data[offset]+4, length) == 0); +} +inline uint32_t StringStore::getStringLength(uint64_t off) +{ + uint32_t length; + MemChunk *mc; + if (off == std::numeric_limits::max()) + return 0; + if (off & 0x8000000000000000) + { + off = off - 0x8000000000000000; + if (longStrings.size() <= off) + return 0; + mc = (MemChunk*) longStrings[off].get(); + memcpy(&length, mc->data, 4); + } + else + { + uint64_t chunk = off / CHUNK_SIZE; + uint64_t offset = off % CHUNK_SIZE; + if (mem.size() <= chunk) + return 0; + mc = (MemChunk *) mem[chunk].get(); + memcpy(&length, &mc->data[offset], 4); + } + + return length; } inline bool StringStore::isEmpty() const diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 197bdedd9..d903f9892 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -21,6 +21,7 @@ * ***********************************************************************/ #include +#include using namespace std; #include "messageobj.h" diff --git a/writeengine/splitter/we_cmdargs.h b/writeengine/splitter/we_cmdargs.h index 96e06a4bc..803f77c54 100644 --- a/writeengine/splitter/we_cmdargs.h +++ b/writeengine/splitter/we_cmdargs.h @@ -77,6 +77,7 @@ class WECmdArgs char getDelimChar() { return fColDelim; } ImportDataMode getImportDataMode() const { return fImportDataMode; } bool getConsoleLog() { return fConsoleLog; } + int getReadBufSize() { return fReadBufSize; } bool isCpimportInvokeMode(){return (fBlockMode3)? false : fCpiInvoke;} bool isQuiteMode() const { return fQuiteMode; } diff --git a/writeengine/splitter/we_filereadthread.cpp b/writeengine/splitter/we_filereadthread.cpp index 6e4fcb4a7..185557965 100644 --- a/writeengine/splitter/we_filereadthread.cpp +++ b/writeengine/splitter/we_filereadthread.cpp @@ -87,6 +87,15 @@ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh):fSdh(aSdh), { //TODO batch qty to get from config fBatchQty = 10000; + if (fSdh.getReadBufSize() < DEFAULTBUFFSIZE) + { + fBuffSize = DEFAULTBUFFSIZE; + } + else + { + fBuffSize = fSdh.getReadBufSize(); + } + fBuff = new char [fBuffSize]; } @@ -106,6 +115,7 @@ WEFileReadThread::~WEFileReadThread() delete fpThread; } fpThread=0; + delete []fBuff; //cout << "WEFileReadThread destructor called" << endl; } @@ -330,16 +340,16 @@ unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs) if(fEnclEsc) { //pStart = aBuff; - aLen = getNextRow(fInFile, fBuff, sizeof(fBuff)-1); + aLen = getNextRow(fInFile, fBuff, fBuffSize-1); } else { - fInFile.getline(fBuff, sizeof(fBuff)-1); + fInFile.getline(fBuff, fBuffSize-1); aLen=fInFile.gcount(); } ////aLen chars incl \n, Therefore aLen-1; '<<' oper won't go past it //cout << "Data Length " << aLen <0)) + if((aLen < (fBuffSize-2)) && (aLen>0)) { fBuff[aLen-1] = '\n'; fBuff[aLen]=0; @@ -348,7 +358,7 @@ unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs) aIdx++; if(fSdh.getDebugLvl()>2) cout << "File data line = " << aIdx <=sizeof(fBuff)-2) //Didn't hit delim; BIG ROW + else if(aLen>=fBuffSize-2) //Didn't hit delim; BIG ROW { cout <<"Bad Row data " << endl; cout << fBuff << endl; diff --git a/writeengine/splitter/we_filereadthread.h b/writeengine/splitter/we_filereadthread.h index 623184e8d..f9a486d5c 100644 --- a/writeengine/splitter/we_filereadthread.h +++ b/writeengine/splitter/we_filereadthread.h @@ -98,7 +98,7 @@ public: void add2InputDataFileList(std::string& FileName); private: - enum { MAXBUFFSIZE=1024*1024 }; + enum { DEFAULTBUFFSIZE=1024*1024 }; // don't allow anyone else to set void setTgtPmId(unsigned int fTgtPmId) { this->fTgtPmId = fTgtPmId; } @@ -120,7 +120,8 @@ private: char fEncl; // Encl char char fEsc; // Esc char char fDelim; // Column Delimit char - char fBuff[MAXBUFFSIZE]; // main data buffer + char* fBuff; // main data buffer + int fBuffSize; }; } /* namespace WriteEngine */ diff --git a/writeengine/splitter/we_sdhandler.cpp b/writeengine/splitter/we_sdhandler.cpp index 56ace80dc..78019b338 100644 --- a/writeengine/splitter/we_sdhandler.cpp +++ b/writeengine/splitter/we_sdhandler.cpp @@ -2301,6 +2301,13 @@ char WESDHandler::getEscChar() //------------------------------------------------------------------------------ +int WESDHandler::getReadBufSize() +{ + return fRef.fCmdArgs.getReadBufSize(); +} + +//------------------------------------------------------------------------------ + char WESDHandler::getDelimChar() { return fRef.fCmdArgs.getDelimChar(); diff --git a/writeengine/splitter/we_sdhandler.h b/writeengine/splitter/we_sdhandler.h index 2d4b20cc2..ff6bc829e 100644 --- a/writeengine/splitter/we_sdhandler.h +++ b/writeengine/splitter/we_sdhandler.h @@ -149,6 +149,7 @@ public: char getEscChar(); char getDelimChar(); bool getConsoleLog(); + int getReadBufSize(); ImportDataMode getImportDataMode() const; void sysLog(const logging::Message::Args& msgArgs, logging::LOG_TYPE logType, logging::Message::MessageID msgId); diff --git a/writeengine/wrapper/we_colop.cpp b/writeengine/wrapper/we_colop.cpp index ac13a7bf0..fdaceef3e 100644 --- a/writeengine/wrapper/we_colop.cpp +++ b/writeengine/wrapper/we_colop.cpp @@ -208,7 +208,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, //Find out where the rest rows go BRM::LBID_t startLbid; //need to put in a loop until newExtent is true - newExtent = dbRootExtentTrackers[0]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid); + newExtent = dbRootExtentTrackers[column.colNo]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid); TableMetaData* tableMetaData= TableMetaData::makeTableMetaData(tableOid); while (!newExtent) { @@ -223,7 +223,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, for (i=0; i < dbRootExtentTrackers.size(); i++) { - if (i != 0) + if (i != column.colNo) dbRootExtentTrackers[i]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid); // Round up HWM to the end of the current extent @@ -278,7 +278,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, } tableMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo); } - newExtent = dbRootExtentTrackers[0]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid); + newExtent = dbRootExtentTrackers[column.colNo]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid); } } @@ -297,7 +297,7 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, } rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partition, segment, extents); - newHwm = extents[0].startBlkOffset; + newHwm = extents[column.colNo].startBlkOffset; if (rc != NO_ERROR) return rc; diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 50d846e7c..afea06fee 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -1505,6 +1505,19 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, for (i = 0; i < colStructList.size(); i++) Convertor::convertColType(&colStructList[i]); + // MCOL-984: find the smallest column width to calculate the RowID from so + // that all HWMs will be incremented by this operation + int32_t lowColLen = 8192; + int32_t colId = 0; + for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) + { + if (colStructList[colIt].colWidth < lowColLen) + { + colId = colIt; + lowColLen = colStructList[colId].colWidth; + } + } + // rc = checkValid(txnid, colStructList, colValueList, ridList); // if (rc != NO_ERROR) // return rc; @@ -1531,8 +1544,8 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, //-------------------------------------------------------------------------- if (isFirstBatchPm) { - currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx(); - extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList(); + currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx(); + extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList(); dbRoot = extentInfo[currentDBrootIdx].fDbRoot; partitionNum = extentInfo[currentDBrootIdx].fPartition; @@ -1698,7 +1711,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, } // if (isFirstBatchPm) else //get the extent info from tableMetaData { - ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { @@ -1730,20 +1743,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- - - // MCOL-984: find the smallest column width to calculate the RowID from so - // that all HWMs will be incremented by this operation - int32_t lowColLen = 8192; - int32_t colId = 0; - for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++) - { - if (colStructList[colIt].colWidth < lowColLen) - { - colId = colIt; - lowColLen = colStructList[colId].colWidth; - curColStruct = colStructList[colId]; - } - } + curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); @@ -1765,7 +1765,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, if (it != aColExtsInfo.end()) { hwm = it->hwm; - //cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl; + //cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[colId].fColSegment << endl; } oldHwm = hwm; //Save this info for rollback @@ -2008,7 +2008,6 @@ timer.stop("tokenize"); if (it != aColExtsInfo.end()) //update hwm info { oldHwm = it->hwm; - } // save hwm for the old extent colWidth = colStructList[i].colWidth; @@ -2032,6 +2031,7 @@ timer.stop("tokenize"); else return ERR_INVALID_PARAM; + } //update hwm for the new extent if (newExtent) { @@ -2043,6 +2043,7 @@ timer.stop("tokenize"); break; it++; } + colWidth = newColStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio); if (succFlag) { @@ -2107,6 +2108,9 @@ timer.start("writeColumnRec"); curFbo)); } } + else + return ERR_INVALID_PARAM; + } } // If we create a new extent for this batch for (unsigned i = 0; i < newColStructList.size(); i++) @@ -2123,7 +2127,8 @@ timer.start("writeColumnRec"); curFbo)); } } - } + else + return ERR_INVALID_PARAM; } if (lbids.size() > 0) @@ -4604,7 +4609,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, bool versioning) { int rc = 0; - void* valArray; + void* valArray = NULL; string segFile; Column curCol; ColStructList::size_type totalColumn; @@ -4629,132 +4634,135 @@ StopWatch timer; totalRow2 = 0; } - valArray = malloc(sizeof(uint64_t) * totalRow1); - - if (totalRow1 == 0) + // It is possible totalRow1 is zero but totalRow2 has values + if ((totalRow1 == 0) && (totalRow2 == 0)) return rc; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); - for (i = 0; i < totalColumn; i++) + if (totalRow1) { - //@Bug 2205 Check if all rows go to the new extent - //Write the first batch - RID * firstPart = rowIdArray; - ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - - // set params - colOp->initColumn(curCol); - // need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, colStructList[i].colWidth, - colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, - colStructList[i].fCompressionType, colStructList[i].fColDbRoot, - colStructList[i].fColPartition, colStructList[i].fColSegment); - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - while (it != aColExtsInfo.end()) + valArray = malloc(sizeof(uint64_t) * totalRow1); + for (i = 0; i < totalColumn; i++) { - if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) - break; - it++; - } + //@Bug 2205 Check if all rows go to the new extent + //Write the first batch + RID * firstPart = rowIdArray; + ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot =colStructList[i].fColDbRoot; - aExt.partNum = colStructList[i].fColPartition; - aExt.segNum = colStructList[i].fColSegment; - aExt.compType = colStructList[i].fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); - } + // set params + colOp->initColumn(curCol); + // need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, colStructList[i].colWidth, + colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, + colStructList[i].fCompressionType, colStructList[i].fColDbRoot, + colStructList[i].fColPartition, colStructList[i].fColSegment); - rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file - if (rc != NO_ERROR) - break; - - // handling versioning - vector rangeList; - if (versioning) - { - rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], - colStructList[i].colWidth, totalRow1, firstPart, rangeList); - if (rc != NO_ERROR) { - if (colStructList[i].fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); - } - - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - break; - } - } - - //totalRow1 -= totalRow2; - // have to init the size here - // nullArray = (bool*) malloc(sizeof(bool) * totalRow); - uint8_t tmp8; - uint16_t tmp16; - uint32_t tmp32; - for (size_t j = 0; j < totalRow1; j++) - { - uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j]; - switch (colStructList[i].colType) + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) { - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - ((uint64_t*)valArray)[j] = curValue; - break; - case WriteEngine::WR_INT: - case WriteEngine::WR_UINT: - case WriteEngine::WR_FLOAT: - tmp32 = curValue; - ((uint32_t*)valArray)[j] = tmp32; - break; - case WriteEngine::WR_ULONGLONG: - case WriteEngine::WR_LONGLONG: - case WriteEngine::WR_DOUBLE: - case WriteEngine::WR_TOKEN: - ((uint64_t*)valArray)[j] = curValue; - break; - case WriteEngine::WR_BYTE: - case WriteEngine::WR_UBYTE: - tmp8 = curValue; - ((uint8_t*)valArray)[j] = tmp8; - break; - case WriteEngine::WR_SHORT: - case WriteEngine::WR_USHORT: - tmp16 = curValue; - ((uint16_t*)valArray)[j] = tmp16; + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) break; + it++; } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot =colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], + colStructList[i].colWidth, totalRow1, firstPart, rangeList); + if (rc != NO_ERROR) { + if (colStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + //totalRow1 -= totalRow2; + // have to init the size here + // nullArray = (bool*) malloc(sizeof(bool) * totalRow); + uint8_t tmp8; + uint16_t tmp16; + uint32_t tmp32; + for (size_t j = 0; j < totalRow1; j++) + { + uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j]; + switch (colStructList[i].colType) + { + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_INT: + case WriteEngine::WR_UINT: + case WriteEngine::WR_FLOAT: + tmp32 = curValue; + ((uint32_t*)valArray)[j] = tmp32; + break; + case WriteEngine::WR_ULONGLONG: + case WriteEngine::WR_LONGLONG: + case WriteEngine::WR_DOUBLE: + case WriteEngine::WR_TOKEN: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_BYTE: + case WriteEngine::WR_UBYTE: + tmp8 = curValue; + ((uint8_t*)valArray)[j] = tmp8; + break; + case WriteEngine::WR_SHORT: + case WriteEngine::WR_USHORT: + tmp16 = curValue; + ((uint16_t*)valArray)[j] = tmp16; + break; + } + } + + +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + colOp->closeColumnFile(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + // check error + if (rc != NO_ERROR) + break; + + } // end of for (i = 0 + if (valArray != NULL) + { + free(valArray); + valArray = NULL; } - - -#ifdef PROFILE -timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); -#ifdef PROFILE -timer.stop("writeRow "); -#endif - colOp->closeColumnFile(curCol); - - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - // check error - if (rc != NO_ERROR) - break; - - } // end of for (i = 0 - if (valArray != NULL) - { - free(valArray); - valArray = NULL; } // MCOL-1176 - Write second extent