diff --git a/VERSION b/VERSION index f5d54e457..e0328ce70 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=1 -COLUMNSTORE_VERSION_MINOR=3 +COLUMNSTORE_VERSION_MINOR=4 COLUMNSTORE_VERSION_PATCH=0 COLUMNSTORE_VERSION_RELEASE=1 diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index cc14424e2..989dffce2 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -2069,7 +2069,8 @@ void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSyst } else { - throw std::runtime_error("Invalid table comment"); + // Generic table comment, we don't need to do anything + return; } // Get the OID for autoinc (if exists) diff --git a/dbcon/execplan/predicateoperator.h b/dbcon/execplan/predicateoperator.h index b83931394..08f0c40cf 100644 --- a/dbcon/execplan/predicateoperator.h +++ b/dbcon/execplan/predicateoperator.h @@ -36,6 +36,7 @@ #endif #include #include +#include #include "expressionparser.h" #include "returnedcolumn.h" @@ -456,16 +457,20 @@ inline bool PredicateOperator::getBoolVal(rowgroup::Row& row, bool& isNull, Retu return !ret; } + // MCOL-1559 + std::string val1 = lop->getStrVal(row, isNull); if (isNull) return false; - const std::string& val1 = lop->getStrVal(row, isNull); - + std::string val2 = rop->getStrVal(row, isNull); if (isNull) return false; - return strCompare(val1, rop->getStrVal(row, isNull)) && !isNull; - } + boost::trim_right_if(val1, boost::is_any_of(" ")); + boost::trim_right_if(val2, boost::is_any_of(" ")); + + return strCompare(val1, val2); + } //FIXME: ??? case execplan::CalpontSystemCatalog::VARBINARY: diff --git a/dbcon/joblist/jlf_execplantojoblist.cpp b/dbcon/joblist/jlf_execplantojoblist.cpp index f3782c9d5..fff1b12fb 100644 --- a/dbcon/joblist/jlf_execplantojoblist.cpp +++ b/dbcon/joblist/jlf_execplantojoblist.cpp @@ -1636,7 +1636,6 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo) string constval(cc->constval()); - CalpontSystemCatalog::OID dictOid = 0; CalpontSystemCatalog::ColType ct = sc->colType(); const PseudoColumn* pc = dynamic_cast(sc); diff --git a/dbcon/joblist/lbidlist.cpp b/dbcon/joblist/lbidlist.cpp index c317defc9..7852562ef 100644 --- a/dbcon/joblist/lbidlist.cpp +++ b/dbcon/joblist/lbidlist.cpp @@ -749,6 +749,7 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min, int64_t tMax = Max; dataconvert::DataConvert::trimWhitespace(tMin); dataconvert::DataConvert::trimWhitespace(tMax); + dataconvert::DataConvert::trimWhitespace(value); scan = compareVal(order_swap(tMin), order_swap(tMax), order_swap(value), op, lcf); diff --git a/dbcon/mysql/columnstore_info.sql b/dbcon/mysql/columnstore_info.sql index d0433a0d9..7655b5f16 100644 --- a/dbcon/mysql/columnstore_info.sql +++ b/dbcon/mysql/columnstore_info.sql @@ -98,4 +98,23 @@ BEGIN SELECT CONCAT((SELECT SUM(data_size) FROM information_schema.columnstore_extents ce left join information_schema.columnstore_columns cc on ce.object_id = cc.object_id where compression_type='Snappy') / (SELECT SUM(compressed_data_size) FROM information_schema.columnstore_files WHERE compressed_data_size IS NOT NULL), ':1') COMPRESSION_RATIO; END // +create procedure columnstore_upgrade() +`columnstore_upgrade`: BEGIN + DECLARE done INTEGER DEFAULT 0; + DECLARE schema_table VARCHAR(100) DEFAULT ""; + DECLARE table_list CURSOR FOR select concat('`', table_schema,'`.`',table_name,'`') from information_schema.tables where engine='columnstore'; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; + OPEN table_list; + tlist: LOOP + FETCH table_list INTO schema_table; + IF done = 1 THEN LEAVE tlist; + END IF; + SET @sql_query = concat('ALTER TABLE ', schema_table, ' COMMENT=\'\''); + PREPARE stmt FROM @sql_query; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END LOOP; +END // +delimiter ; + DELIMITER ; diff --git a/dbcon/mysql/ha_calpont_ddl.cpp b/dbcon/mysql/ha_calpont_ddl.cpp index a59701e9b..191ff610a 100644 --- a/dbcon/mysql/ha_calpont_ddl.cpp +++ b/dbcon/mysql/ha_calpont_ddl.cpp @@ -2095,8 +2095,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* if ( schemaSyncOnly && isCreate) return rc; - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return rc; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2294,8 +2293,7 @@ int ha_calpont_impl_delete_table_(const char* db, const char* name, cal_connecti return 0; } - //this is replcated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. @@ -2434,8 +2432,7 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti pair toPair; string stmt; - //this is replicated DDL, treat it just like SSO - if (thd->slave_thread) + if (thd->slave_thread && !ci.replicationEnabled) return 0; //@bug 5660. Error out REAL DDL/DML on slave node. diff --git a/dbcon/mysql/ha_calpont_dml.cpp b/dbcon/mysql/ha_calpont_dml.cpp index dd2b09135..66edb13fa 100644 --- a/dbcon/mysql/ha_calpont_dml.cpp +++ b/dbcon/mysql/ha_calpont_dml.cpp @@ -2080,7 +2080,8 @@ int ha_calpont_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connectio thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE ) return rc; - if (thd->slave_thread) return 0; + if (thd->slave_thread && !ci.replicationEnabled) + return 0; std::string command("COMMIT"); #ifdef INFINIDB_DEBUG diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index e553254e3..e617e5959 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -7140,7 +7140,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i // for subquery, order+limit by will be supported in infinidb. build order by columns // @todo union order by and limit support - if (gwi.hasWindowFunc || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT) + if (gwi.hasWindowFunc + || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT + || ( isUnion && ordercol )) { for (; ordercol; ordercol = ordercol->next) { diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 57c17c6e7..b20354726 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -1012,7 +1012,6 @@ uint32_t doUpdateDelete(THD* thd) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && !thd->slave_thread) { string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE); @@ -1034,7 +1033,14 @@ uint32_t doUpdateDelete(THD* thd) // stats start ci->stats.reset(); ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -2129,8 +2135,9 @@ int ha_calpont_impl_rnd_init(TABLE* table) // prevent "create table as select" from running on slave thd->infinidb_vtable.hasInfiniDBTable = true; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2184,7 +2191,6 @@ int ha_calpont_impl_rnd_init(TABLE* table) set_fe_conn_info_ptr((void*)new cal_connection_info()); cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - idbassert(ci != 0); // MySQL sometimes calls rnd_init multiple times, plan should only be @@ -2296,7 +2302,14 @@ int ha_calpont_impl_rnd_init(TABLE* table) { ci->stats.reset(); // reset query stats ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -2661,8 +2674,9 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) { THD* thd = current_thd; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( + cal_connection_info* ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2673,7 +2687,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) thd->lex->sql_command == SQLCOM_LOAD)) return 0; - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) return ER_INTERNAL_ERROR; @@ -2703,7 +2716,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table) set_fe_conn_info_ptr((void*)new cal_connection_info()); cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -2767,8 +2779,17 @@ int ha_calpont_impl_rnd_end(TABLE* table) int rc = 0; THD* thd = current_thd; cal_connection_info* ci = NULL; + bool replicationEnabled = false; - if (thd->slave_thread && ( + if (thd->infinidb_vtable.cal_conn_info) + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + + if (ci && ci->replicationEnabled) + { + replicationEnabled = true; + } + + if (thd->slave_thread && !replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -2783,7 +2804,6 @@ int ha_calpont_impl_rnd_end(TABLE* table) if (get_fe_conn_info_ptr() != NULL) ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY ) { thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; // flip back to normal state @@ -3042,9 +3062,8 @@ int ha_calpont_impl_write_row(uchar* buf, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread) return 0; - - + if (thd->slave_thread && !ci->replicationEnabled) + return 0; if (ci->alterTableState > 0) return 0; @@ -3129,7 +3148,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE) thd->infinidb_vtable.isInfiniDBDML = true; - if (thd->slave_thread) return; + if (thd->slave_thread && !ci->replicationEnabled) + return; //@bug 5660. Error out DDL/DML on slave node, or on local query node if (ci->isSlaveNode && thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE) @@ -3568,7 +3588,14 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table) // query stats. only collect execution time and rows inserted for insert/load_data_infile ci->stats.reset(); ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -3654,7 +3681,8 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table) cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (thd->slave_thread) return 0; + if (thd->slave_thread && !ci->replicationEnabled) + return 0; int rc = 0; @@ -4317,7 +4345,14 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE { ci->stats.reset(); // reset query stats ci->stats.setStartTime(); - ci->stats.fUser = thd->main_security_ctx.user; + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } if (thd->main_security_ctx.host) ci->stats.fHost = thd->main_security_ctx.host; @@ -4736,19 +4771,6 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE { THD* thd = current_thd; - /* If this node is the slave, ignore DML to IDB tables */ - if (thd->slave_thread && ( - thd->lex->sql_command == SQLCOM_INSERT || - thd->lex->sql_command == SQLCOM_INSERT_SELECT || - thd->lex->sql_command == SQLCOM_UPDATE || - thd->lex->sql_command == SQLCOM_UPDATE_MULTI || - thd->lex->sql_command == SQLCOM_DELETE || - thd->lex->sql_command == SQLCOM_DELETE_MULTI || - thd->lex->sql_command == SQLCOM_TRUNCATE || - thd->lex->sql_command == SQLCOM_LOAD)) - return 0; - - if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR) return ER_INTERNAL_ERROR; @@ -4774,6 +4796,17 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + if (thd->slave_thread && !ci->replicationEnabled && ( + thd->lex->sql_command == SQLCOM_INSERT || + thd->lex->sql_command == SQLCOM_INSERT_SELECT || + thd->lex->sql_command == SQLCOM_UPDATE || + thd->lex->sql_command == SQLCOM_UPDATE_MULTI || + thd->lex->sql_command == SQLCOM_DELETE || + thd->lex->sql_command == SQLCOM_DELETE_MULTI || + thd->lex->sql_command == SQLCOM_TRUNCATE || + thd->lex->sql_command == SQLCOM_LOAD)) + return 0; + // @bug 3078 if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) { @@ -4839,7 +4872,19 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* THD* thd = current_thd; cal_connection_info* ci = NULL; - if (thd->slave_thread && ( + thd->infinidb_vtable.isNewQuery = true; + thd->infinidb_vtable.isUnion = false; + + if (get_fe_conn_info_ptr() != NULL) + ci = reinterpret_cast(get_fe_conn_info_ptr()); + + if (!ci) + { + thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info()); + ci = reinterpret_cast(thd->infinidb_vtable.cal_conn_info); + } + + if (thd->slave_thread && !ci->replicationEnabled && ( thd->lex->sql_command == SQLCOM_INSERT || thd->lex->sql_command == SQLCOM_INSERT_SELECT || thd->lex->sql_command == SQLCOM_UPDATE || @@ -4850,12 +4895,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* thd->lex->sql_command == SQLCOM_LOAD)) return 0; - thd->infinidb_vtable.isNewQuery = true; - thd->infinidb_vtable.isUnion = false; - - if (get_fe_conn_info_ptr() != NULL) - ci = reinterpret_cast(get_fe_conn_info_ptr()); - if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ) { diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 4f0046745..d53d6e652 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -251,10 +251,18 @@ struct cal_connection_info useXbit(false), utf8(false), useCpimport(1), - delimiter('\7') + delimiter('\7'), + replicationEnabled(false) { // check if this is a slave mysql daemon isSlaveNode = checkSlave(); + + std::string option = config::Config::makeConfig()->getConfig("SystemConfig", "ReplicationEnabled"); + + if (!option.compare("Y")) + { + replicationEnabled = true; + } } static bool checkSlave() @@ -319,6 +327,7 @@ struct cal_connection_info char delimiter; char enclosed_by; std::vector columnTypes; + bool replicationEnabled; // MCOL-1101 remove compilation unit variable rmParms std::vector rmParms; }; diff --git a/primitives/primproc/dictstep.cpp b/primitives/primproc/dictstep.cpp index abd99ada3..47bfeac25 100644 --- a/primitives/primproc/dictstep.cpp +++ b/primitives/primproc/dictstep.cpp @@ -30,6 +30,7 @@ #include #include +#include #include "bpp.h" #include "primitiveserver.h" @@ -93,6 +94,7 @@ void DictStep::createCommand(ByteStream& bs) for (uint32_t i = 0; i < filterCount; i++) { bs >> strTmp; + boost::trim_right_if(strTmp, boost::is_any_of(" ")); //cout << " " << strTmp << endl; eqFilter->insert(strTmp); } diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 76ff8ef3c..9359556e9 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -28,6 +28,7 @@ #include #include #include +#include //#define NDEBUG #include #include @@ -1804,6 +1805,7 @@ private: for (i = 0; i < count; i++) { *bs >> str; + boost::trim_right_if(str, boost::is_any_of(" ")); filter->insert(str); } diff --git a/utils/regr/corr.h b/utils/regr/corr.h index eba7597eb..d1b5f55ac 100644 --- a/utils/regr/corr.h +++ b/utils/regr/corr.h @@ -25,8 +25,7 @@ * Columnstore interface for for the corr function * * - * CREATE AGGREGATE FUNCTION corr returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION corr returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_corr diff --git a/utils/regr/covar_pop.h b/utils/regr/covar_pop.h index fc47d4497..dda396fb9 100644 --- a/utils/regr/covar_pop.h +++ b/utils/regr/covar_pop.h @@ -25,8 +25,7 @@ * Columnstore interface for for the covar_pop function * * - * CREATE AGGREGATE FUNCTION covar_pop returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION covar_pop returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_covar_pop diff --git a/utils/regr/covar_samp.h b/utils/regr/covar_samp.h index 6aba65054..a65625520 100644 --- a/utils/regr/covar_samp.h +++ b/utils/regr/covar_samp.h @@ -25,8 +25,7 @@ * Columnstore interface for for the covar_samp function * * - * CREATE AGGREGATE FUNCTION covar_samp returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION covar_samp returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_covar_samp diff --git a/utils/regr/regr_avgx.cpp b/utils/regr/regr_avgx.cpp index bf010e648..8e4314d01 100644 --- a/utils/regr/regr_avgx.cpp +++ b/utils/regr/regr_avgx.cpp @@ -63,13 +63,6 @@ mcsv1_UDAF::ReturnCode regr_avgx::init(mcsv1Context* context, context->setErrorMessage("regr_avgx() with a non-numeric x argument"); return mcsv1_UDAF::ERROR; } - if (!(isNumeric(colTypes[1].dataType))) - { - // The error message will be prepended with - // "The storage engine for the table doesn't support " - context->setErrorMessage("regr_avgx() with a non-numeric independant (second) argument"); - return mcsv1_UDAF::ERROR; - } context->setUserDataSize(sizeof(regr_avgx_data)); context->setResultType(CalpontSystemCatalog::DOUBLE); diff --git a/utils/regr/regr_avgx.h b/utils/regr/regr_avgx.h index 75791f769..960a6a892 100644 --- a/utils/regr/regr_avgx.h +++ b/utils/regr/regr_avgx.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_avgx function * * - * CREATE AGGREGATE FUNCTION regr_avgx returns REAL soname - * 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_avgx returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_avgx diff --git a/utils/regr/regr_avgy.cpp b/utils/regr/regr_avgy.cpp index 7325d991f..3d49e96b4 100644 --- a/utils/regr/regr_avgy.cpp +++ b/utils/regr/regr_avgy.cpp @@ -60,14 +60,7 @@ mcsv1_UDAF::ReturnCode regr_avgy::init(mcsv1Context* context, { // The error message will be prepended with // "The storage engine for the table doesn't support " - context->setErrorMessage("regr_avgy() with a non-numeric x argument"); - return mcsv1_UDAF::ERROR; - } - if (!(isNumeric(colTypes[0].dataType))) - { - // The error message will be prepended with - // "The storage engine for the table doesn't support " - context->setErrorMessage("regr_avgy() with a non-numeric dependant (first) argument"); + context->setErrorMessage("regr_avgy() with a non-numeric y argument"); return mcsv1_UDAF::ERROR; } diff --git a/utils/regr/regr_avgy.h b/utils/regr/regr_avgy.h index c99021f9f..c2a3020da 100644 --- a/utils/regr/regr_avgy.h +++ b/utils/regr/regr_avgy.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_avgy function * * - * CREATE AGGREGATE FUNCTION regr_avgy returns REAL soname - * 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_avgy returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_avgy diff --git a/utils/regr/regr_count.h b/utils/regr/regr_count.h index 4f4fc558e..25cde7898 100644 --- a/utils/regr/regr_count.h +++ b/utils/regr/regr_count.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_count function * * - * CREATE AGGREGATE FUNCTION regr_count returns INTEGER - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_count returns INTEGER soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_count diff --git a/utils/regr/regr_intercept.h b/utils/regr/regr_intercept.h index ed82477cd..ef8dc6de5 100644 --- a/utils/regr/regr_intercept.h +++ b/utils/regr/regr_intercept.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_intercept function * * - * CREATE AGGREGATE FUNCTION regr_intercept returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_intercept returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_intercept diff --git a/utils/regr/regr_r2.h b/utils/regr/regr_r2.h index d440ad5a1..968814067 100644 --- a/utils/regr/regr_r2.h +++ b/utils/regr/regr_r2.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_r2 function * * - * CREATE AGGREGATE FUNCTION regr_r2 returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_r2 returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_r2 diff --git a/utils/regr/regr_slope.h b/utils/regr/regr_slope.h index 9c148d895..8a20494c1 100644 --- a/utils/regr/regr_slope.h +++ b/utils/regr/regr_slope.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_slope function * * - * CREATE AGGREGATE FUNCTION regr_slope returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_slope returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_slope diff --git a/utils/regr/regr_sxx.cpp b/utils/regr/regr_sxx.cpp index 5769a227b..4d5fac370 100644 --- a/utils/regr/regr_sxx.cpp +++ b/utils/regr/regr_sxx.cpp @@ -132,7 +132,7 @@ mcsv1_UDAF::ReturnCode regr_sxx::evaluate(mcsv1Context* context, static_any::any long double sumx2 = data->sumx2; long double var_popx = (sumx2 - (sumx * sumx / N)) / N; - valOut = static_cast(data->cnt * var_popx); + valOut = static_cast(N * var_popx); } return mcsv1_UDAF::SUCCESS; } diff --git a/utils/regr/regr_sxx.h b/utils/regr/regr_sxx.h index 14d82bd55..53c771b6f 100644 --- a/utils/regr/regr_sxx.h +++ b/utils/regr/regr_sxx.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_sxx function * * - * CREATE AGGREGATE FUNCTION regr_sxx returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_sxx returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_sxx diff --git a/utils/regr/regr_sxy.h b/utils/regr/regr_sxy.h index 25aa34145..6371c6fed 100644 --- a/utils/regr/regr_sxy.h +++ b/utils/regr/regr_sxy.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_sxy function * * - * CREATE AGGREGATE FUNCTION regr_sxy returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_sxy returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_sxy diff --git a/utils/regr/regr_syy.cpp b/utils/regr/regr_syy.cpp index 014a28389..6febb9579 100644 --- a/utils/regr/regr_syy.cpp +++ b/utils/regr/regr_syy.cpp @@ -132,7 +132,7 @@ mcsv1_UDAF::ReturnCode regr_syy::evaluate(mcsv1Context* context, static_any::any long double sumy2 = data->sumy2; long double var_popy = (sumy2 - (sumy * sumy / N)) / N; - valOut = static_cast(data->cnt * var_popy); + valOut = static_cast(N * var_popy); } return mcsv1_UDAF::SUCCESS; } diff --git a/utils/regr/regr_syy.h b/utils/regr/regr_syy.h index a837fab13..d1a582f4d 100644 --- a/utils/regr/regr_syy.h +++ b/utils/regr/regr_syy.h @@ -25,8 +25,7 @@ * Columnstore interface for for the regr_syy function * * - * CREATE AGGREGATE FUNCTION regr_syy returns REAL - * soname 'libregr_mysql.so'; + * CREATE AGGREGATE FUNCTION regr_syy returns REAL soname 'libregr_mysql.so'; * */ #ifndef HEADER_regr_syy diff --git a/utils/regr/regrmysql.cpp b/utils/regr/regrmysql.cpp index 4980108e3..2570163f1 100644 --- a/utils/regr/regrmysql.cpp +++ b/utils/regr/regrmysql.cpp @@ -147,7 +147,7 @@ extern "C" */ struct regr_avgx_data { - double sumx; + long double sumx; int64_t cnt; }; @@ -159,21 +159,29 @@ extern "C" struct regr_avgx_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_avgx() requires two arguments"); - return 1; + strcpy(message,"regr_avgx() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[1], args->attributes[1]))) { strcpy(message,"regr_avgx() with a non-numeric independant (second) argument"); return 1; } + if (args->arg_type[1] == DECIMAL_RESULT && initid->decimals != DECIMAL_NOT_SPECIFIED) + { + initid->decimals += 4; + } + else + { + initid->decimals = DECIMAL_NOT_SPECIFIED; + } - if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data)))) + if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } - data->sumx = 0; + data->sumx = 0; data->cnt = 0; initid->ptr = (char*)data; @@ -226,7 +234,17 @@ extern "C" char* is_null, char* error __attribute__((unused))) { struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr; - return data->sumx / data->cnt; + double valOut = 0; + if (data->cnt > 0) + { + valOut = static_cast(data->sumx / data->cnt); + } + else + { + *is_null = 1; + } + + return valOut; } //======================================================================= @@ -236,8 +254,8 @@ extern "C" */ struct regr_avgy_data { - double sumy; - int64_t cnt; + long double sumy; + int64_t cnt; }; #ifdef _MSC_VER @@ -248,8 +266,8 @@ extern "C" struct regr_avgy_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_avgy() requires two arguments"); - return 1; + strcpy(message,"regr_avgy() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]))) { @@ -257,10 +275,19 @@ extern "C" return 1; } - if (!(data = (struct regr_avgy_data*) malloc(sizeof(struct regr_avgy_data)))) + if (args->arg_type[0] == DECIMAL_RESULT && initid->decimals != DECIMAL_NOT_SPECIFIED) + { + initid->decimals += 4; + } + else + { + initid->decimals = DECIMAL_NOT_SPECIFIED; + } + + if (!(data = (struct regr_avgy_data*) malloc(sizeof(struct regr_avgy_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->sumy = 0; data->cnt = 0; @@ -315,7 +342,16 @@ extern "C" char* is_null, char* error __attribute__((unused))) { struct regr_avgy_data* data = (struct regr_avgy_data*)initid->ptr; - return data->sumy / data->cnt; + double valOut = 0; + if (data->cnt > 0) + { + valOut = static_cast(data->sumy / data->cnt); + } + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -336,14 +372,14 @@ extern "C" struct regr_count_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_count() requires two arguments"); - return 1; + strcpy(message,"regr_count() requires two arguments"); + return 1; } if (!(data = (struct regr_count_data*) malloc(sizeof(struct regr_count_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; @@ -405,10 +441,10 @@ extern "C" struct regr_slope_data { int64_t cnt; - double sumx; - double sumx2; // sum of (x squared) - double sumy; - double sumxy; // sum of (x*y) + long double sumx; + long double sumx2; // sum of (x squared) + long double sumy; + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -419,8 +455,8 @@ extern "C" struct regr_slope_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_slope() requires two arguments"); - return 1; + strcpy(message,"regr_slope() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -428,10 +464,12 @@ extern "C" return 1; } + initid->decimals = DECIMAL_NOT_SPECIFIED; + if (!(data = (struct regr_slope_data*) malloc(sizeof(struct regr_slope_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -497,20 +535,23 @@ extern "C" { struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr; double N = data->cnt; + double valOut = 0; + *is_null = 1; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumx2 = data->sumx2; - double sumxy = data->sumxy; - double variance = (N * sumx2) - (sumx * sumx); - if (variance) + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumx2 = data->sumx2; + long double sumxy = data->sumxy; + long double covar_pop = N * sumxy - sumx * sumy; + long double var_pop = N * sumx2 - sumx * sumx; + if (var_pop != 0) { - return ((N * sumxy) - (sumx * sumy)) / variance; + valOut = static_cast(covar_pop / var_pop); + *is_null = 0; } } - *is_null = 1; - return 0; + return valOut; } //======================================================================= @@ -521,10 +562,10 @@ extern "C" struct regr_intercept_data { int64_t cnt; - double sumx; - double sumx2; // sum of (x squared) - double sumy; - double sumxy; // sum of (x*y) + long double sumx; + long double sumx2; // sum of (x squared) + long double sumy; + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -535,8 +576,8 @@ extern "C" struct regr_intercept_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_intercept() requires two arguments"); - return 1; + strcpy(message,"regr_intercept() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -544,10 +585,11 @@ extern "C" return 1; } - if (!(data = (struct regr_intercept_data*) malloc(sizeof(struct regr_intercept_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + if (!(data = (struct regr_intercept_data*) malloc(sizeof(struct regr_intercept_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -613,22 +655,23 @@ extern "C" { struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr; double N = data->cnt; + double valOut = 0; + *is_null = 1; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumx2 = data->sumx2; - double sumxy = data->sumxy; - double slope = 0; - double variance = (N * sumx2) - (sumx * sumx); - if (variance) + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumx2 = data->sumx2; + long double sumxy = data->sumxy; + long double numerator = sumy * sumx2 - sumx * sumxy; + long double var_pop = (N * sumx2) - (sumx * sumx); + if (var_pop != 0) { - slope = ((N * sumxy) - (sumx * sumy)) / variance; + valOut = static_cast(numerator / var_pop); + *is_null = 0; } - return (sumy - (slope * sumx)) / N; } - *is_null = 1; - return 0; + return valOut; } //======================================================================= @@ -639,11 +682,11 @@ extern "C" struct regr_r2_data { int64_t cnt; - double sumx; - double sumx2; // sum of (x squared) - double sumy; - double sumy2; // sum of (y squared) - double sumxy; // sum of (x*y) + long double sumx; + long double sumx2; // sum of (x squared) + long double sumy; + long double sumy2; // sum of (y squared) + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -654,8 +697,8 @@ extern "C" struct regr_r2_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_r2() requires two arguments"); - return 1; + strcpy(message,"regr_r2() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -663,10 +706,12 @@ extern "C" return 1; } - if (!(data = (struct regr_r2_data*) malloc(sizeof(struct regr_r2_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct regr_r2_data*) malloc(sizeof(struct regr_r2_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -735,34 +780,38 @@ extern "C" { struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumx2 = data->sumx2; - double sumy2 = data->sumy2; - double sumxy = data->sumxy; - double var_popx = (sumx2 - (sumx * sumx / N)) / N; + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumx2 = data->sumx2; + long double sumy2 = data->sumy2; + long double sumxy = data->sumxy; + long double var_popx = (sumx2 - (sumx * sumx / N)) / N; if (var_popx == 0) { // When var_popx is 0, NULL is the result. *is_null = 1; return 0; } - double var_popy = (sumy2 - (sumy * sumy / N)) / N; + long double var_popy = (sumy2 - (sumy * sumy / N)) / N; if (var_popy == 0) { // When var_popy is 0, 1 is the result return 1; } - double std_popx = sqrt(var_popx); - double std_popy = sqrt(var_popy); - double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; - double corr = covar_pop / (std_popy * std_popx); - return corr * corr; + long double std_popx = sqrt(var_popx); + long double std_popy = sqrt(var_popy); + long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; + long double corr = covar_pop / (std_popy * std_popx); + valOut = static_cast(corr * corr); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -773,11 +822,11 @@ extern "C" struct corr_data { int64_t cnt; - double sumx; - double sumx2; // sum of (x squared) - double sumy; - double sumy2; // sum of (y squared) - double sumxy; // sum of (x*y) + long double sumx; + long double sumx2; // sum of (x squared) + long double sumy; + long double sumy2; // sum of (y squared) + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -788,8 +837,8 @@ extern "C" struct corr_data* data; if (args->arg_count != 2) { - strcpy(message,"corr() requires two arguments"); - return 1; + strcpy(message,"corr() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -797,10 +846,12 @@ extern "C" return 1; } - if (!(data = (struct corr_data*) malloc(sizeof(struct corr_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct corr_data*) malloc(sizeof(struct corr_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -869,34 +920,38 @@ extern "C" { struct corr_data* data = (struct corr_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumx2 = data->sumx2; - double sumy2 = data->sumy2; - double sumxy = data->sumxy; - double var_popx = (sumx2 - (sumx * sumx / N)) / N; + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumx2 = data->sumx2; + long double sumy2 = data->sumy2; + long double sumxy = data->sumxy; + long double var_popx = (sumx2 - (sumx * sumx / N)) / N; if (var_popx == 0) { // When var_popx is 0, NULL is the result. *is_null = 1; return 0; } - double var_popy = (sumy2 - (sumy * sumy / N)) / N; + long double var_popy = (sumy2 - (sumy * sumy / N)) / N; if (var_popy == 0) { // When var_popy is 0, 1 is the result return 1; } - double std_popx = sqrt(var_popx); - double std_popy = sqrt(var_popy); - double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; - double corr = covar_pop / (std_popy * std_popx); - return corr; + long double std_popx = sqrt(var_popx); + long double std_popy = sqrt(var_popy); + long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; + long double corr = covar_pop / (std_popy * std_popx); + return static_cast(corr); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -907,8 +962,8 @@ extern "C" struct regr_sxx_data { int64_t cnt; - double sumx; - double sumx2; // sum of (x squared) + long double sumx; + long double sumx2; // sum of (x squared) }; #ifdef _MSC_VER @@ -919,8 +974,8 @@ extern "C" struct regr_sxx_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_sxx() requires two arguments"); - return 1; + strcpy(message,"regr_sxx() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -928,10 +983,12 @@ extern "C" return 1; } - if (!(data = (struct regr_sxx_data*) malloc(sizeof(struct regr_sxx_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct regr_sxx_data*) malloc(sizeof(struct regr_sxx_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -990,15 +1047,19 @@ extern "C" { struct regr_sxx_data* data = (struct regr_sxx_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumx = data->sumx; - double sumx2 = data->sumx2; - double var_popx = (sumx2 - (sumx * sumx / N)) / N; - return data->cnt * var_popx; + long double sumx = data->sumx; + long double sumx2 = data->sumx2; + long double var_popx = (sumx2 - (sumx * sumx / N)) / N; + valOut = static_cast(N * var_popx); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -1008,8 +1069,8 @@ extern "C" struct regr_syy_data { int64_t cnt; - double sumy; - double sumy2; // sum of (y squared) + long double sumy; + long double sumy2; // sum of (y squared) }; #ifdef _MSC_VER @@ -1020,8 +1081,8 @@ extern "C" struct regr_syy_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_syy() requires two arguments"); - return 1; + strcpy(message,"regr_syy() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]))) { @@ -1029,10 +1090,12 @@ extern "C" return 1; } - if (!(data = (struct regr_syy_data*) malloc(sizeof(struct regr_syy_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct regr_syy_data*) malloc(sizeof(struct regr_syy_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumy = 0.0; @@ -1091,15 +1154,19 @@ extern "C" { struct regr_syy_data* data = (struct regr_syy_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumy = data->sumy; - double sumy2 = data->sumy2; - double var_popy = (sumy2 - (sumy * sumy / N)) / N; - return data->cnt * var_popy; + long double sumy = data->sumy; + long double sumy2 = data->sumy2; + long double var_popy = (sumy2 - (sumy * sumy / N)) / N; + valOut = static_cast(N * var_popy); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -1110,9 +1177,9 @@ extern "C" struct regr_sxy_data { int64_t cnt; - double sumx; - double sumy; - double sumxy; // sum of (x*y) + long double sumx; + long double sumy; + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -1123,8 +1190,8 @@ extern "C" struct regr_sxy_data* data; if (args->arg_count != 2) { - strcpy(message,"regr_sxy() requires two arguments"); - return 1; + strcpy(message,"regr_sxy() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -1132,10 +1199,12 @@ extern "C" return 1; } - if (!(data = (struct regr_sxy_data*) malloc(sizeof(struct regr_sxy_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct regr_sxy_data*) malloc(sizeof(struct regr_sxy_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -1198,16 +1267,21 @@ extern "C" { struct regr_sxy_data* data = (struct regr_sxy_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumxy = data->sumxy; - double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; - return data->cnt * covar_pop; + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumxy = data->sumxy; + long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; + long double regr_sxy = N * covar_pop; + valOut = static_cast(regr_sxy); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -1218,9 +1292,9 @@ extern "C" struct covar_pop_data { int64_t cnt; - double sumx; - double sumy; - double sumxy; // sum of (x*y) + long double sumx; + long double sumy; + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -1231,8 +1305,8 @@ extern "C" struct covar_pop_data* data; if (args->arg_count != 2) { - strcpy(message,"covar_pop() requires two arguments"); - return 1; + strcpy(message,"covar_pop() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -1240,10 +1314,12 @@ extern "C" return 1; } - if (!(data = (struct covar_pop_data*) malloc(sizeof(struct covar_pop_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct covar_pop_data*) malloc(sizeof(struct covar_pop_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -1306,16 +1382,20 @@ extern "C" { struct covar_pop_data* data = (struct covar_pop_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumxy = data->sumxy; - double covar_pop = (sumxy - ((sumx * sumy) / N)) / N; - return covar_pop; + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumxy = data->sumxy; + long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N ; + valOut = static_cast(covar_pop); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } //======================================================================= @@ -1325,9 +1405,9 @@ extern "C" struct covar_samp_data { int64_t cnt; - double sumx; - double sumy; - double sumxy; // sum of (x*y) + long double sumx; + long double sumy; + long double sumxy; // sum of (x*y) }; #ifdef _MSC_VER @@ -1338,8 +1418,8 @@ extern "C" struct covar_samp_data* data; if (args->arg_count != 2) { - strcpy(message,"covar_samp() requires two arguments"); - return 1; + strcpy(message,"covar_samp() requires two arguments"); + return 1; } if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1]))) { @@ -1347,10 +1427,12 @@ extern "C" return 1; } - if (!(data = (struct covar_samp_data*) malloc(sizeof(struct covar_samp_data)))) + initid->decimals = DECIMAL_NOT_SPECIFIED; + + if (!(data = (struct covar_samp_data*) malloc(sizeof(struct covar_samp_data)))) { - strmov(message,"Couldn't allocate memory"); - return 1; + strmov(message,"Couldn't allocate memory"); + return 1; } data->cnt = 0; data->sumx = 0.0; @@ -1413,16 +1495,20 @@ extern "C" { struct covar_samp_data* data = (struct covar_samp_data*)initid->ptr; double N = data->cnt; + double valOut = 0; if (N > 0) { - double sumx = data->sumx; - double sumy = data->sumy; - double sumxy = data->sumxy; - double covar_samp = (sumxy - ((sumx * sumy) / N)) / (N-1); - return covar_samp; + long double sumx = data->sumx; + long double sumy = data->sumy; + long double sumxy = data->sumxy; + long double covar_samp = (sumxy - ((sumx * sumy) / N)) / (N - 1); + valOut = static_cast(covar_samp); } - *is_null = 1; - return 0; + else + { + *is_null = 1; + } + return valOut; } } // vim:ts=4 sw=4: diff --git a/utils/rowgroup/rowaggregation.cpp b/utils/rowgroup/rowaggregation.cpp index 4bfbf87b6..55be66459 100644 --- a/utils/rowgroup/rowaggregation.cpp +++ b/utils/rowgroup/rowaggregation.cpp @@ -218,7 +218,6 @@ inline string getStringNullValue() namespace rowgroup { - const std::string typeStr(""); const static_any::any& RowAggregation::charTypeId((char)1); const static_any::any& RowAggregation::scharTypeId((signed char)1); @@ -590,7 +589,8 @@ inline bool RowAggregation::isNull(const RowGroup* pRowGroup, const Row& row, in RowAggregation::RowAggregation() : fAggMapPtr(NULL), fRowGroupOut(NULL), fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE), - fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0) + fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0), + fOrigFunctionCols(NULL) { } @@ -599,7 +599,8 @@ RowAggregation::RowAggregation(const vector& rowAggGroupByCol const vector& rowAggFunctionCols) : fAggMapPtr(NULL), fRowGroupOut(NULL), fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE), - fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0) + fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0), + fOrigFunctionCols(NULL) { fGroupByCols.assign(rowAggGroupByCols.begin(), rowAggGroupByCols.end()); fFunctionCols.assign(rowAggFunctionCols.begin(), rowAggFunctionCols.end()); @@ -610,7 +611,7 @@ RowAggregation::RowAggregation(const RowAggregation& rhs): fAggMapPtr(NULL), fRowGroupOut(NULL), fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE), fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0), - fRGContext(rhs.fRGContext) + fRGContext(rhs.fRGContext), fOrigFunctionCols(NULL) { //fGroupByCols.clear(); //fFunctionCols.clear(); @@ -714,11 +715,8 @@ void RowAggregation::setJoinRowGroups(vector* pSmallSideRG, RowGroup* // threads on the PM and by multple threads on the UM. It must remain // thread safe. //------------------------------------------------------------------------------ -void RowAggregation::resetUDAF(uint64_t funcColID) +void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF) { - // Get the UDAF class pointer and store in the row definition object. - RowUDAFFunctionCol* rowUDAF = dynamic_cast(fFunctionCols[funcColID].get()); - // RowAggregation and it's functions need to be re-entrant which means // each instance (thread) needs its own copy of the context object. // Note: operator=() doesn't copy userData. @@ -786,7 +784,7 @@ void RowAggregation::initialize() { if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) { - resetUDAF(i); + resetUDAF(dynamic_cast(fFunctionCols[i].get())); } } } @@ -838,7 +836,7 @@ void RowAggregation::aggReset() { if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) { - resetUDAF(i); + resetUDAF(dynamic_cast(fFunctionCols[i].get())); } } } @@ -885,14 +883,28 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row) inserted.first->second = RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1); // If there's UDAF involved, reset the user data. - for (uint64_t i = 0; i < fFunctionCols.size(); i++) + if (fOrigFunctionCols) { - if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) + // This is a multi-distinct query and fFunctionCols may not + // contain all the UDAF we need to reset + for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++) { - resetUDAF(i); + if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF) + { + resetUDAF(dynamic_cast((*fOrigFunctionCols)[i].get())); + } + } + } + else + { + for (uint64_t i = 0; i < fFunctionCols.size(); i++) + { + if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) + { + resetUDAF(dynamic_cast(fFunctionCols[i].get())); + } } } - // replace the key value with an equivalent copy, yes this is OK const_cast((inserted.first->first)) = pos; } @@ -946,14 +958,28 @@ void RowAggregation::aggregateRow(Row& row) RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1); // If there's UDAF involved, reset the user data. - for (uint64_t i = 0; i < fFunctionCols.size(); i++) + if (fOrigFunctionCols) { - if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) + // This is a multi-distinct query and fFunctionCols may not + // contain all the UDAF we need to reset + for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++) { - resetUDAF(i); + if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF) + { + resetUDAF(dynamic_cast((*fOrigFunctionCols)[i].get())); + } + } + } + else + { + for (uint64_t i = 0; i < fFunctionCols.size(); i++) + { + if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF) + { + resetUDAF(dynamic_cast(fFunctionCols[i].get())); + } } } - } else { @@ -4699,7 +4725,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation() { // backup the function column vector for finalize(). vector origFunctionCols = fFunctionCols; - + fOrigFunctionCols = &origFunctionCols; // aggregate data from each sub-aggregator to distinct aggregator for (uint64_t i = 0; i < fSubAggregators.size(); ++i) { @@ -4727,6 +4753,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation() // restore the function column vector fFunctionCols = origFunctionCols; + fOrigFunctionCols = NULL; } @@ -4734,7 +4761,8 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector origFunctionCols = fFunctionCols; - + fOrigFunctionCols = &origFunctionCols; + // aggregate data from each sub-aggregator to distinct aggregator for (uint64_t i = 0; i < fSubAggregators.size(); ++i) { @@ -4751,9 +4779,9 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vectorclear(); } - void resetUDAF(uint64_t funcColID); + void resetUDAF(RowUDAFFunctionCol* rowUDAF); inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col); inline void makeAggFieldsNull(Row& row); @@ -710,6 +710,9 @@ protected: static const static_any::any& doubleTypeId; static const static_any::any& longdoubleTypeId; static const static_any::any& strTypeId; + + // For UDAF along with with multiple distinct columns + vector* fOrigFunctionCols; }; //------------------------------------------------------------------------------ diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 27f0b98a4..b91e0f0ef 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -1176,7 +1176,12 @@ inline bool Row::equals(const Row& r2, const std::vector& keyCols) con if (!isLongString(col)) { - if (getUintField(col) != r2.getUintField(col)) + if (getColType(i) == execplan::CalpontSystemCatalog::LONGDOUBLE) + { + if (getLongDoubleField(i) != r2.getLongDoubleField(i)) + return false; + } + else if (getUintField(col) != r2.getUintField(col)) return false; } else @@ -1204,7 +1209,12 @@ inline bool Row::equals(const Row& r2, uint32_t lastCol) const for (uint32_t i = 0; i <= lastCol; i++) if (!isLongString(i)) { - if (getUintField(i) != r2.getUintField(i)) + if (getColType(i) == execplan::CalpontSystemCatalog::LONGDOUBLE) + { + if (getLongDoubleField(i) != r2.getLongDoubleField(i)) + return false; + } + else if (getUintField(i) != r2.getUintField(i)) return false; } else diff --git a/writeengine/bulk/we_colbufmgr.cpp b/writeengine/bulk/we_colbufmgr.cpp index 9e6e78dc7..36f3e5493 100644 --- a/writeengine/bulk/we_colbufmgr.cpp +++ b/writeengine/bulk/we_colbufmgr.cpp @@ -45,7 +45,7 @@ namespace { // Minimum time to wait for a condition, so as to periodically wake up and // check the global job status, to see if the job needs to terminate. -const int COND_WAIT_SECONDS = 3; +const int COND_WAIT_SECONDS = 1; } namespace WriteEngine diff --git a/writeengine/dictionary/we_dctnry.cpp b/writeengine/dictionary/we_dctnry.cpp index dfc5622b2..3b77dae7a 100644 --- a/writeengine/dictionary/we_dctnry.cpp +++ b/writeengine/dictionary/we_dctnry.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -108,7 +109,6 @@ Dctnry::Dctnry() : &m_endHeader, HDR_UNIT_SIZE); m_curFbo = INVALID_NUM; m_curLbid = INVALID_LBID; - memset(m_sigArray, 0, MAX_STRING_CACHE_SIZE * sizeof(Signature)); m_arraySize = 0; clear();//files @@ -130,14 +130,16 @@ Dctnry::~Dctnry() ******************************************************************************/ void Dctnry::freeStringCache( ) { - for (int i = 0; i < m_arraySize; i++) + std::set::iterator it; + for (it=m_sigArray.begin(); it!=m_sigArray.end(); it++) { - delete [] m_sigArray[i].signature; - m_sigArray[i].signature = 0; + Signature sig = *it; + delete [] sig.signature; + sig.signature = 0; } - memset(m_sigArray, 0, MAX_STRING_CACHE_SIZE * sizeof(Signature)); m_arraySize = 0; + m_sigArray.clear(); } /******************************************************************************* @@ -161,7 +163,6 @@ int Dctnry::init() m_curOp = 0; memset( m_curBlock.data, 0, sizeof(m_curBlock.data)); m_curBlock.lbid = INVALID_LBID; - memset(m_sigArray, 0, MAX_STRING_CACHE_SIZE * sizeof(Signature)); m_arraySize = 0; return NO_ERROR; @@ -627,19 +628,17 @@ int Dctnry::openDctnry(const OID& dctnryOID, ******************************************************************************/ bool Dctnry::getTokenFromArray(Signature& sig) { - for (int i = 0; i < (int)m_arraySize ; i++ ) - { - if (sig.size == m_sigArray[i].size) - { - if (!memcmp(sig.signature, m_sigArray[i].signature, sig.size)) - { - sig.token = m_sigArray[i].token; - return true; - }//endif sig compare - }//endif size compare - } + std::set::iterator it; + it = m_sigArray.find(sig); + if ( it == m_sigArray.end()){ + return false; + }else{ + Signature sigfound = *it; + sig.token = sigfound.token; + return true; + } - return false; + return false; } /******************************************************************************* @@ -1333,7 +1332,7 @@ void Dctnry::preLoadStringCache( const DataBlock& fileBlock ) memcpy(aSig.signature, &fileBlock.data[offBeg], len); aSig.token.op = op; aSig.token.fbo = m_curLbid; - m_sigArray[op - 1] = aSig; + m_sigArray.insert(aSig); offEnd = offBeg; hdrOffsetBeg += HDR_UNIT_SIZE; @@ -1372,7 +1371,7 @@ void Dctnry::addToStringCache( const Signature& newSig ) memcpy(asig.signature, newSig.signature, newSig.size ); asig.size = newSig.size; asig.token = newSig.token; - m_sigArray[m_arraySize] = asig; + m_sigArray.insert(asig); m_arraySize++; } @@ -1465,7 +1464,7 @@ int Dctnry::updateDctnry(unsigned char* sigValue, int& sigSize, sig.signature = new unsigned char[sigSize]; memcpy (sig.signature, sigValue, sigSize); sig.token = token; - m_sigArray[m_arraySize] = sig; + m_sigArray.insert(sig); m_arraySize++; } diff --git a/writeengine/dictionary/we_dctnry.h b/writeengine/dictionary/we_dctnry.h index 2b7686696..93135da4d 100644 --- a/writeengine/dictionary/we_dctnry.h +++ b/writeengine/dictionary/we_dctnry.h @@ -56,6 +56,17 @@ typedef struct Signature Token token; } Signature; +struct sig_compare { + bool operator() (const Signature& a, const Signature& b) const { + if (a.size == b.size){ + return memcmp(a.signature,b.signature,a.size)<0;} + else if (a.size& oids); virtual int numOfBlocksInFile(); - Signature m_sigArray[MAX_STRING_CACHE_SIZE]; // string cache + std::set m_sigArray; int m_arraySize; // num strings in m_sigArray // m_dctnryHeader used for hdr when readSubBlockEntry is used to read a blk diff --git a/writeengine/dictionary/we_dctnrystore.cpp b/writeengine/dictionary/we_dctnrystore.cpp index 8e2982b57..4a7fefe73 100644 --- a/writeengine/dictionary/we_dctnrystore.cpp +++ b/writeengine/dictionary/we_dctnrystore.cpp @@ -133,7 +133,7 @@ const int DctnryStore::updateDctnryStore(unsigned char* sigValue, sig.signature = new unsigned char[sigSize]; memcpy (sig.signature, sigValue, sigSize); sig.token = token; - m_dctnry.m_sigArray[m_dctnry.m_arraySize] = sig; + m_dctnry.m_sigArray.insert(sig) = sig; m_dctnry.m_arraySize++; }