1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-09-01 01:22:04 +03:00

Merge branch 'develop' into MCOL-3536

This commit is contained in:
David Hall
2019-10-08 12:00:20 -05:00
27 changed files with 1143 additions and 82 deletions

8
.gitignore vendored
View File

@@ -113,3 +113,11 @@ columnstoreversion.h
tags tags
*.orig *.orig
*.diff *.diff
oam/install_scripts/columnstore
oam/install_scripts/columnstoreInstall.sh
oam/install_scripts/columnstoreLogRotate
oam/install_scripts/post-install
oam/install_scripts/postInstall.sh
oam/install_scripts/pre-uninstall
oam/install_scripts/syslogSetup.sh
tools/setConfig/configxml.sh

View File

@@ -1,10 +1,10 @@
# MariaDB ColumnStore Storage/Execution engine 1.2 # MariaDB ColumnStore Storage/Execution engine 1.4
MariaDB ColumnStore 1.2 is the GA version of MariaDB ColumnStore. MariaDB ColumnStore 1.4 is a Beta version of MariaDB ColumnStore.
It is built by porting InfiniDB 4.6.7 on MariaDB 10.2 and adding entirely It is built by porting InfiniDB 4.6.7 on MariaDB and adding entirely
new features not found anywhere else. new features not found anywhere else.
# MariaDB ColumnStore 1.2 is a GA release. # MariaDB ColumnStore 1.4 is a Beta release.
- Do not use pre-releases on production systems. - Do not use pre-releases on production systems.

View File

@@ -1,4 +1,4 @@
COLUMNSTORE_VERSION_MAJOR=1 COLUMNSTORE_VERSION_MAJOR=1
COLUMNSTORE_VERSION_MINOR=4 COLUMNSTORE_VERSION_MINOR=4
COLUMNSTORE_VERSION_PATCH=0 COLUMNSTORE_VERSION_PATCH=1
COLUMNSTORE_VERSION_RELEASE=1 COLUMNSTORE_VERSION_RELEASE=1

View File

@@ -112,7 +112,7 @@ ostream& operator<<(ostream& output, const FunctionColumn& rhs)
const string FunctionColumn::toString() const const string FunctionColumn::toString() const
{ {
ostringstream output; ostringstream output;
output << "FunctionColumn: " << fFunctionName << endl; output << std::endl << "FunctionColumn: " << fFunctionName << endl;
if (fAlias.length() > 0) output << "/Alias: " << fAlias; if (fAlias.length() > 0) output << "/Alias: " << fAlias;

2
dbcon/joblist/windowfunctionstep.cpp Normal file → Executable file
View File

@@ -522,7 +522,7 @@ void WindowFunctionStep::checkWindowFunction(CalpontSelectExecutionPlan* csep, J
igpc < csep->groupByCols().end(); igpc < csep->groupByCols().end();
++igpc) ++igpc)
{ {
if (*igpc->get() == *j->get()) if ((*igpc)->alias() == (*j)->alias())
{ {
bFound = true; bFound = true;
break; break;

View File

@@ -6,6 +6,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
SET ( libcalmysql_SRCS SET ( libcalmysql_SRCS
ha_mcs_sysvars.cpp ha_mcs_sysvars.cpp
ha_mcs_client_udfs.cpp ha_mcs_client_udfs.cpp
ha_mcs_pushdown.cpp
ha_calpont.cpp ha_calpont.cpp
ha_calpont_impl.cpp ha_calpont_impl.cpp
ha_calpont_dml.cpp ha_calpont_dml.cpp

View File

@@ -20,9 +20,9 @@
#include "ha_calpont.h" #include "ha_calpont.h"
#include "columnstoreversion.h" #include "columnstoreversion.h"
#include "ha_mcs_pushdown.h"
#define NEED_CALPONT_EXTERNS #define NEED_CALPONT_EXTERNS
#include "ha_calpont_impl.h" #include "ha_calpont_impl.h"
#include "ha_mcs_pushdown.h"
static handler* calpont_create_handler(handlerton* hton, static handler* calpont_create_handler(handlerton* hton,
TABLE_SHARE* table, TABLE_SHARE* table,
@@ -36,13 +36,13 @@ handlerton* mcs_hton;
// handlers creation function for hton. // handlers creation function for hton.
// Look into ha_mcs_pushdown.* for more details. // Look into ha_mcs_pushdown.* for more details.
static group_by_handler* group_by_handler*
create_calpont_group_by_handler(THD* thd, Query* query); create_calpont_group_by_handler(THD* thd, Query* query);
static derived_handler* derived_handler*
create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived); create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived);
static select_handler* select_handler*
create_columnstore_select_handler(THD* thd, SELECT_LEX* sel); create_columnstore_select_handler(THD* thd, SELECT_LEX* sel);
/* Variables for example share methods */ /* Variables for example share methods */
@@ -890,7 +890,6 @@ int ha_calpont::create(const char* name, TABLE* table_arg,
DBUG_ENTER("ha_calpont::create"); DBUG_ENTER("ha_calpont::create");
int rc = ha_calpont_impl_create(name, table_arg, create_info); int rc = ha_calpont_impl_create(name, table_arg, create_info);
// table_arg->s->write_frm_image();
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
@@ -904,8 +903,6 @@ const COND* ha_calpont::cond_push(const COND* cond)
struct st_mysql_storage_engine columnstore_storage_engine = struct st_mysql_storage_engine columnstore_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
#include "ha_mcs_pushdown.cpp"
mysql_declare_plugin(columnstore) mysql_declare_plugin(columnstore)
{ {
MYSQL_STORAGE_ENGINE_PLUGIN, MYSQL_STORAGE_ENGINE_PLUGIN,

9
dbcon/mysql/ha_calpont_execplan.cpp Normal file → Executable file
View File

@@ -361,7 +361,8 @@ bool sortItemIsInGrouping(Item* sort_item, ORDER* groupcol)
Item_func *ifp = reinterpret_cast<Item_func*>(sort_item); Item_func *ifp = reinterpret_cast<Item_func*>(sort_item);
ifp->traverse_cond(check_sum_func_item, &found, Item::POSTFIX); ifp->traverse_cond(check_sum_func_item, &found, Item::POSTFIX);
} }
else if (sort_item->type() == Item::CONST_ITEM) else if (sort_item->type() == Item::CONST_ITEM ||
sort_item->type() == Item::WINDOW_FUNC_ITEM)
{ {
found = true; found = true;
} }
@@ -3175,7 +3176,7 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp
String val, *str = item->val_str(&val); String val, *str = item->val_str(&val);
string valStr; string valStr;
valStr.assign(str->ptr(), str->length()); valStr.assign(str->ptr(), str->length());
rc = new ConstantColumn(valStr, ConstantColumn::NUM); rc = new ConstantColumn(valStr);
break; break;
} }
case REAL_RESULT: case REAL_RESULT:
@@ -4950,6 +4951,10 @@ ReturnedColumn* buildAggregateColumn(Item* item, gp_walk_info& gwi)
mcsv1sdk::mcsv1Context& context = udafc->getContext(); mcsv1sdk::mcsv1Context& context = udafc->getContext();
context.setName(isp->func_name()); context.setName(isp->func_name());
// Get the return type as defined by CREATE AGGREGATE FUNCTION
// Most functions don't care, but some may.
context.setMariaDBReturnType((mcsv1sdk::enum_mariadb_return_type)isp->field_type());
// Set up the return type defaults for the call to init() // Set up the return type defaults for the call to init()
context.setResultType(udafc->resultType().colDataType); context.setResultType(udafc->resultType().colDataType);
context.setColWidth(udafc->resultType().colWidth); context.setColWidth(udafc->resultType().colWidth);

View File

@@ -1359,7 +1359,9 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
if (value->type() == Item::CONST_ITEM) if (value->type() == Item::CONST_ITEM)
{ {
if (value->cmp_type() == STRING_RESULT) if (value->cmp_type() == STRING_RESULT ||
value->cmp_type() == DECIMAL_RESULT ||
value->cmp_type() == REAL_RESULT)
{ {
//@Bug 2587 use val_str to replace value->name to get rid of 255 limit //@Bug 2587 use val_str to replace value->name to get rid of 255 limit
String val, *str; String val, *str;
@@ -4152,8 +4154,10 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos)
* 0 if success * 0 if success
* others if something went wrong whilst getting the result set * others if something went wrong whilst getting the result set
***********************************************************/ ***********************************************************/
int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table) int ha_calpont_impl_group_by_init(mcs_handler_info *handler_info, TABLE* table)
{ {
ha_calpont_group_by_handler *group_hand=
reinterpret_cast<ha_calpont_group_by_handler*>(handler_info->hndl_ptr);
string tableName = group_hand->table_list->table->s->table_name.str; string tableName = group_hand->table_list->table->s->table_name.str;
IDEBUG( cout << "group_by_init for table " << tableName << endl ); IDEBUG( cout << "group_by_init for table " << tableName << endl );
THD* thd = current_thd; THD* thd = current_thd;
@@ -4590,7 +4594,7 @@ internal_error:
* HA_ERR_END_OF_FILE if the record set has come to an end * HA_ERR_END_OF_FILE if the record set has come to an end
* others if something went wrong whilst getting the result set * others if something went wrong whilst getting the result set
***********************************************************/ ***********************************************************/
int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table) int ha_calpont_impl_group_by_next(TABLE* table)
{ {
THD* thd = current_thd; THD* thd = current_thd;
@@ -4678,7 +4682,7 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
return rc; return rc;
} }
int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table) int ha_calpont_impl_group_by_end(TABLE* table)
{ {
int rc = 0; int rc = 0;
THD* thd = current_thd; THD* thd = current_thd;

View File

@@ -20,9 +20,10 @@
#define HA_CALPONT_IMPL_H__ #define HA_CALPONT_IMPL_H__
#include "idb_mysql.h" #include "idb_mysql.h"
#include "ha_mcs_pushdown.h"
#ifdef NEED_CALPONT_EXTERNS #ifdef NEED_CALPONT_EXTERNS
// Forward declaration.
struct mcs_handler_info;
extern int ha_calpont_impl_discover_existence(const char* schema, const char* name); extern int ha_calpont_impl_discover_existence(const char* schema, const char* name);
extern int ha_calpont_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info); extern int ha_calpont_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info);
extern int ha_calpont_impl_delete_table(const char* name); extern int ha_calpont_impl_delete_table(const char* name);
@@ -43,11 +44,11 @@ extern int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type);
extern int ha_calpont_impl_update_row(); extern int ha_calpont_impl_update_row();
extern int ha_calpont_impl_delete_row(); extern int ha_calpont_impl_delete_row();
extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table);
extern int ha_cs_impl_select_next(uchar *buf, TABLE *table); extern int ha_cs_impl_select_next(uchar *buf, TABLE *table);
extern int ha_calpont_impl_group_by_init(mcs_handler_info *handler_info, TABLE* table);
extern int ha_calpont_impl_group_by_next(TABLE* table);
extern int ha_calpont_impl_group_by_end(TABLE* table);
#endif #endif
@@ -55,7 +56,6 @@ extern int ha_cs_impl_select_next(uchar *buf, TABLE *table);
#include "ha_calpont_impl_if.h" #include "ha_calpont_impl_if.h"
#include "calpontsystemcatalog.h" #include "calpontsystemcatalog.h"
#include "ha_calpont.h" #include "ha_calpont.h"
#include "ha_mcs_pushdown.h"
extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci); extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci);
extern int ha_calpont_impl_write_row_(const uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted); extern int ha_calpont_impl_write_row_(const uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted);
extern int ha_calpont_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci); extern int ha_calpont_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci);
@@ -72,10 +72,6 @@ extern std::string ha_calpont_impl_droppartition_ (execplan::CalpontSystemCatal
extern std::string ha_calpont_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename); extern std::string ha_calpont_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename);
extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_info& ci, uint64_t tableLockID); extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_info& ci, uint64_t tableLockID);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_cs_impl_derived_next(TABLE* table);
#endif #endif
#endif #endif

View File

@@ -14,7 +14,10 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
// ha_calpont.cpp includes this file. #include <typeinfo>
#include <string>
#include "ha_mcs_pushdown.h"
void check_walk(const Item* item, void* arg); void check_walk(const Item* item, void* arg);
@@ -25,7 +28,6 @@ void mutate_optimizer_flags(THD *thd_)
// in SH::scan_init() // in SH::scan_init()
set_original_optimizer_flags(thd_->variables.optimizer_switch, thd_); set_original_optimizer_flags(thd_->variables.optimizer_switch, thd_);
thd_->variables.optimizer_switch = OPTIMIZER_SWITCH_IN_TO_EXISTS | thd_->variables.optimizer_switch = OPTIMIZER_SWITCH_IN_TO_EXISTS |
OPTIMIZER_SWITCH_EXISTS_TO_IN |
OPTIMIZER_SWITCH_COND_PUSHDOWN_FOR_DERIVED | OPTIMIZER_SWITCH_COND_PUSHDOWN_FOR_DERIVED |
OPTIMIZER_SWITCH_COND_PUSHDOWN_FROM_HAVING; OPTIMIZER_SWITCH_COND_PUSHDOWN_FROM_HAVING;
} }
@@ -339,7 +341,7 @@ void item_check(Item* item, bool* unsupported_feature)
* group_by_handler if success * group_by_handler if success
* NULL in other case * NULL in other case
***********************************************************/ ***********************************************************/
static group_by_handler* group_by_handler*
create_calpont_group_by_handler(THD* thd, Query* query) create_calpont_group_by_handler(THD* thd, Query* query)
{ {
ha_calpont_group_by_handler* handler = NULL; ha_calpont_group_by_handler* handler = NULL;
@@ -439,7 +441,7 @@ create_calpont_group_by_handler(THD* thd, Query* query)
* derived_handler if possible * derived_handler if possible
* NULL in other case * NULL in other case
***********************************************************/ ***********************************************************/
static derived_handler* derived_handler*
create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived) create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived)
{ {
ha_columnstore_derived_handler* handler = NULL; ha_columnstore_derived_handler* handler = NULL;
@@ -659,7 +661,8 @@ int ha_calpont_group_by_handler::init_scan()
{ {
DBUG_ENTER("ha_calpont_group_by_handler::init_scan"); DBUG_ENTER("ha_calpont_group_by_handler::init_scan");
int rc = ha_calpont_impl_group_by_init(this, table); mcs_handler_info mhi = mcs_handler_info(reinterpret_cast<void*>(this), GROUP_BY);
int rc = ha_calpont_impl_group_by_init(&mhi, table);
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
@@ -673,7 +676,7 @@ int ha_calpont_group_by_handler::init_scan()
int ha_calpont_group_by_handler::next_row() int ha_calpont_group_by_handler::next_row()
{ {
DBUG_ENTER("ha_calpont_group_by_handler::next_row"); DBUG_ENTER("ha_calpont_group_by_handler::next_row");
int rc = ha_calpont_impl_group_by_next(this, table); int rc = ha_calpont_impl_group_by_next(table);
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
@@ -687,8 +690,7 @@ int ha_calpont_group_by_handler::next_row()
int ha_calpont_group_by_handler::end_scan() int ha_calpont_group_by_handler::end_scan()
{ {
DBUG_ENTER("ha_calpont_group_by_handler::end_scan"); DBUG_ENTER("ha_calpont_group_by_handler::end_scan");
int rc = ha_calpont_impl_group_by_end(table);
int rc = ha_calpont_impl_group_by_end(this, table);
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
@@ -706,7 +708,7 @@ int ha_calpont_group_by_handler::end_scan()
* select_handler if possible * select_handler if possible
* NULL in other case * NULL in other case
***********************************************************/ ***********************************************************/
static select_handler* select_handler*
create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex) create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
{ {
ha_columnstore_select_handler* handler = NULL; ha_columnstore_select_handler* handler = NULL;
@@ -714,7 +716,12 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
// MCOL-2178 Disable SP support in the select_handler for now. // MCOL-2178 Disable SP support in the select_handler for now.
// Check the session variable value to enable/disable use of // Check the session variable value to enable/disable use of
// select_handler // select_handler
if (!get_select_handler(thd) || (thd->lex)->sphead) // Disable processing of select_result_interceptor classes
// which intercept and transform result set rows. E.g.:
// select a,b into @a1, @a2 from t1;
if (!get_select_handler(thd) || (thd->lex)->sphead ||
((thd->lex)->result &&
!((select_dumpvar *)(thd->lex)->result)->var_list.is_empty()))
{ {
return handler; return handler;
} }
@@ -797,6 +804,7 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
} }
} }
if (!unsupported_feature) if (!unsupported_feature)
{ {
handler= new ha_columnstore_select_handler(thd, select_lex); handler= new ha_columnstore_select_handler(thd, select_lex);

View File

@@ -20,6 +20,9 @@
#include "idb_mysql.h" #include "idb_mysql.h"
#include "ha_calpont.h" #include "ha_calpont.h"
#include "ha_mcs_sysvars.h"
#define NEED_CALPONT_EXTERNS
#include "ha_calpont_impl.h"
void mutate_optimizer_flags(THD *thd_); void mutate_optimizer_flags(THD *thd_);
void restore_optimizer_flags(THD *thd_); void restore_optimizer_flags(THD *thd_);

View File

@@ -30,7 +30,7 @@ INSTALL PLUGIN columnstore_tables SONAME 'is_columnstore_tables.so';
INSTALL PLUGIN columnstore_columns SONAME 'is_columnstore_columns.so'; INSTALL PLUGIN columnstore_columns SONAME 'is_columnstore_columns.so';
INSTALL PLUGIN columnstore_extents SONAME 'is_columnstore_extents.so'; INSTALL PLUGIN columnstore_extents SONAME 'is_columnstore_extents.so';
INSTALL PLUGIN columnstore_files SONAME 'is_columnstore_files.so'; INSTALL PLUGIN columnstore_files SONAME 'is_columnstore_files.so';
INSERT INTO mysql.func VALUES ('calgetstats',0,'libcalmysql.so','function'),('calsettrace',2,'libcalmysql.so','function'),('calsetparms',0,'libcalmysql.so','function'),('calflushcache',2,'libcalmysql.so','function'),('calgettrace',0,'libcalmysql.so','function'),('calgetversion',0,'libcalmysql.so','function'),('calonlinealter',2,'libcalmysql.so','function'),('calviewtablelock',0,'libcalmysql.so','function'),('calcleartablelock',0,'libcalmysql.so','function'),('callastinsertid',2,'libcalmysql.so','function'),('calgetsqlcount',0,'libcalmysql.so','function'),('idbpm',2,'libcalmysql.so','function'),('idbdbroot',2,'libcalmysql.so','function'),('idbsegment',2,'libcalmysql.so','function'),('idbsegmentdir',2,'libcalmysql.so','function'),('idbextentrelativerid',2,'libcalmysql.so','function'),('idbblockid',2,'libcalmysql.so','function'),('idbextentid',2,'libcalmysql.so','function'),('idbextentmin',0,'libcalmysql.so','function'),('idbextentmax',0,'libcalmysql.so','function'),('idbpartition',0,'libcalmysql.so','function'),('idblocalpm',2,'libcalmysql.so','function'),('mcssystemready',2,'libcalmysql.so','function'),('mcssystemreadonly',2,'libcalmysql.so','function'),('mcssystemprimary',2,'libcalmysql.so','function'),('regr_avgx',1,'libregr_mysql.so','aggregate'),('regr_avgy',1,'libregr_mysql.so','aggregate'),('regr_count',2,'libregr_mysql.so','aggregate'),('regr_slope',1,'libregr_mysql.so','aggregate'),('regr_intercept',1,'libregr_mysql.so','aggregate'),('regr_r2',1,'libregr_mysql.so','aggregate'),('corr',1,'libregr_mysql.so','aggregate'),('regr_sxx',1,'libregr_mysql.so','aggregate'),('regr_syy',1,'libregr_mysql.so','aggregate'),('regr_sxy',1,'libregr_mysql.so','aggregate'),('covar_pop',1,'libregr_mysql.so','aggregate'),('covar_samp',1,'libregr_mysql.so','aggregate'),('distinct_count',2,'libudf_mysql.so','aggregate'),('caldisablepartitions',0,'libcalmysql.so','function'),('calenablepartitions',0,'libcalmysql.so','function'),('caldroppartitions',0,'libcalmysql.so','function'),('calshowpartitions',0,'libcalmysql.so','function'),('caldroppartitionsbyvalue',0,'libcalmysql.so','function'),('caldisablepartitionsbyvalue',0,'libcalmysql.so','function'),('calenablepartitionsbyvalue',0,'libcalmysql.so','function'),('calshowpartitionsbyvalue',0,'libcalmysql.so','function'); INSERT INTO mysql.func VALUES ('calgetstats',0,'libcalmysql.so','function'),('calsettrace',2,'libcalmysql.so','function'),('calsetparms',0,'libcalmysql.so','function'),('calflushcache',2,'libcalmysql.so','function'),('calgettrace',0,'libcalmysql.so','function'),('calgetversion',0,'libcalmysql.so','function'),('calonlinealter',2,'libcalmysql.so','function'),('calviewtablelock',0,'libcalmysql.so','function'),('calcleartablelock',0,'libcalmysql.so','function'),('callastinsertid',2,'libcalmysql.so','function'),('calgetsqlcount',0,'libcalmysql.so','function'),('idbpm',2,'libcalmysql.so','function'),('idbdbroot',2,'libcalmysql.so','function'),('idbsegment',2,'libcalmysql.so','function'),('idbsegmentdir',2,'libcalmysql.so','function'),('idbextentrelativerid',2,'libcalmysql.so','function'),('idbblockid',2,'libcalmysql.so','function'),('idbextentid',2,'libcalmysql.so','function'),('idbextentmin',0,'libcalmysql.so','function'),('idbextentmax',0,'libcalmysql.so','function'),('idbpartition',0,'libcalmysql.so','function'),('idblocalpm',2,'libcalmysql.so','function'),('mcssystemready',2,'libcalmysql.so','function'),('mcssystemreadonly',2,'libcalmysql.so','function'),('mcssystemprimary',2,'libcalmysql.so','function'),('regr_avgx',1,'libregr_mysql.so','aggregate'),('regr_avgy',1,'libregr_mysql.so','aggregate'),('regr_count',2,'libregr_mysql.so','aggregate'),('regr_slope',1,'libregr_mysql.so','aggregate'),('regr_intercept',1,'libregr_mysql.so','aggregate'),('regr_r2',1,'libregr_mysql.so','aggregate'),('corr',1,'libregr_mysql.so','aggregate'),('regr_sxx',1,'libregr_mysql.so','aggregate'),('regr_syy',1,'libregr_mysql.so','aggregate'),('regr_sxy',1,'libregr_mysql.so','aggregate'),('covar_pop',1,'libregr_mysql.so','aggregate'),('covar_samp',1,'libregr_mysql.so','aggregate'),('moda',4,'libregr_mysql.so','aggregate'),('distinct_count',2,'libudf_mysql.so','aggregate'),('caldisablepartitions',0,'libcalmysql.so','function'),('calenablepartitions',0,'libcalmysql.so','function'),('caldroppartitions',0,'libcalmysql.so','function'),('calshowpartitions',0,'libcalmysql.so','function'),('caldroppartitionsbyvalue',0,'libcalmysql.so','function'),('caldisablepartitionsbyvalue',0,'libcalmysql.so','function'),('calenablepartitionsbyvalue',0,'libcalmysql.so','function'),('calshowpartitionsbyvalue',0,'libcalmysql.so','function');
CREATE DATABASE IF NOT EXISTS infinidb_vtable; CREATE DATABASE IF NOT EXISTS infinidb_vtable;
CREATE DATABASE IF NOT EXISTS infinidb_querystats; CREATE DATABASE IF NOT EXISTS infinidb_querystats;

View File

@@ -127,6 +127,7 @@ if [ -n "$plugin" ]; then
setenv=`$COLUMNSTORE_INSTALL_DIR/bin/getConfig SystemConfig DataFileEnvFile` setenv=`$COLUMNSTORE_INSTALL_DIR/bin/getConfig SystemConfig DataFileEnvFile`
if [ -n "$setenv" ]; then
eval userhome=~$user eval userhome=~$user
bashFile=$userhome/.bashrc bashFile=$userhome/.bashrc
touch ${bashFile} touch ${bashFile}
@@ -134,6 +135,7 @@ if [ -n "$plugin" ]; then
echo " " >> ${bashFile} echo " " >> ${bashFile}
echo ". $COLUMNSTORE_INSTALL_DIR/bin/$setenv" >> ${bashFile} echo ". $COLUMNSTORE_INSTALL_DIR/bin/$setenv" >> ${bashFile}
fi fi
fi
# if mysqlrep is on and module has a my.cnf file, upgrade it # if mysqlrep is on and module has a my.cnf file, upgrade it

View File

@@ -323,19 +323,13 @@ if [ -z "$hadoop" ]; then
cat <<EOD cat <<EOD
The next steps are: The next steps are:
If installing on a pm1 node using non-distributed install If installation on pm1:
export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
$installdir/bin/postConfigure -i $installdir $installdir/bin/postConfigure -i $installdir
If installing on a pm1 node using distributed install If installing on a node other than pm1:
export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
$installdir/bin/postConfigure -i $installdir -d
If installing on a non-pm1 using the non-distributed option:
export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
@@ -346,15 +340,11 @@ EOD
cat <<EOD cat <<EOD
The next step is: The next step is:
If installing on a pm1 node using non-distributed install If installation on pm1:
$installdir/bin/postConfigure $installdir/bin/postConfigure
If installing on a pm1 node using distributed install If installing on a node other than pm1:
$installdir/bin/postConfigure -d
If installing on a non-pm1 using the non-distributed option:
$installdir/bin/columnstore start $installdir/bin/columnstore start
@@ -386,7 +376,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
. $installdir/bin/setenv-hdfs-20 . $installdir/bin/setenv-hdfs-20
$installdir/bin/postConfigure -i $installdir $installdir/bin/postConfigure -i $installdir
If installing on a non-pm1 using the non-distributed option: If installing on a node other than pm1:
export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
@@ -400,7 +390,7 @@ export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
$installdir/bin/postConfigure -i $installdir $installdir/bin/postConfigure -i $installdir
If installing on a non-pm1 using the non-distributed option: If installing on a node other than pm1:
export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR export COLUMNSTORE_INSTALL_DIR=$COLUMNSTORE_INSTALL_DIR
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH export LD_LIBRARY_PATH=$LD_LIBRARY_PATH
@@ -415,7 +405,7 @@ If you are intending to install MariaDB Columnstore over Hadoop, the next steps
. $installdir/bin/setenv-hdfs-20 . $installdir/bin/setenv-hdfs-20
$installdir/bin/postConfigure $installdir/bin/postConfigure
If installing on a non-pm1 using the non-distributed option: If installing on a node other than pm1:
. $installdir/bin/setenv-hdfs-20 . $installdir/bin/setenv-hdfs-20
$installdir/bin/columnstore start $installdir/bin/columnstore start
@@ -426,7 +416,7 @@ If installing on a pm1 node:
$installdir/bin/postConfigure $installdir/bin/postConfigure
If installing on a non-pm1 using the non-distributed option: If installing on a node other than pm1:
$installdir/bin/columnstore start $installdir/bin/columnstore start
@@ -472,7 +462,7 @@ If installing on a pm1 node:
. $installdir/bin/setenv-hdfs-12 . $installdir/bin/setenv-hdfs-12
$installdir/bin/postConfigure $installdir/bin/postConfigure
If installing on a non-pm1 using the non-distributed option: If installing on a node other than pm1:
. $installdir/bin/setenv-hdfs-12 . $installdir/bin/setenv-hdfs-12
$installdir/bin/columnstore start $installdir/bin/columnstore start
@@ -483,7 +473,7 @@ If installing on a pm1 node:
$installdir/bin/postConfigure $installdir/bin/postConfigure
If installing on a non-pm1 using the non-distributed option: If installing on a node other than pm1:
$installdir/bin/columnstore start $installdir/bin/columnstore start

View File

@@ -317,7 +317,7 @@ int main(int argc, char* argv[])
cout << " Enter one of the options within [], if available, or" << endl; cout << " Enter one of the options within [], if available, or" << endl;
cout << " Enter a new value" << endl << endl; cout << " Enter a new value" << endl << endl;
cout << endl; cout << endl;
cout << "Usage: postConfigure [-h][-c][-u][-p][-qs][-qm][-qa][-port][-i][-n][-d][-sn][-pm-ip-addrs][-um-ip-addrs][-pm-count][-um-count][-x][-xr][-numBlocksPct][-totalUmMemory]" << endl; cout << "Usage: postConfigure [-h][-c][-u][-p][-qs][-qm][-qa][-port][-i][-sn][-pm-ip-addrs][-um-ip-addrs][-pm-count][-um-count][-x][-xr][-numBlocksPct][-totalUmMemory]" << endl;
cout << " -h Help" << endl; cout << " -h Help" << endl;
cout << " -c Config File to use to extract configuration data, default is Columnstore.xml.rpmsave" << endl; cout << " -c Config File to use to extract configuration data, default is Columnstore.xml.rpmsave" << endl;
cout << " -u Upgrade, Install using the Config File from -c, default to Columnstore.xml.rpmsave" << endl; cout << " -u Upgrade, Install using the Config File from -c, default to Columnstore.xml.rpmsave" << endl;
@@ -511,7 +511,7 @@ int main(int argc, char* argv[])
else else
{ {
cout << " ERROR: Invalid Argument = " << argv[i] << endl; cout << " ERROR: Invalid Argument = " << argv[i] << endl;
cout << " Usage: postConfigure [-h][-c][-u][-p][-qs][-qm][-qa][-port][-i][-n][-d][-sn][-pm-ip-addrs][-um-ip-addrs][-pm-count][-um-count][-x][-xr][-numBlocksPct][-totalUmMemory]" << endl; cout << " Usage: postConfigure [-h][-c][-u][-p][-qs][-qm][-qa][-port][-i][-sn][-pm-ip-addrs][-um-ip-addrs][-pm-count][-um-count][-x][-xr][-numBlocksPct][-totalUmMemory]" << endl;
exit (1); exit (1);
} }
} }
@@ -792,7 +792,6 @@ int main(int argc, char* argv[])
try try
{ {
sysConfig->setConfig(InstallSection, "MySQLPort", mysqlPort); sysConfig->setConfig(InstallSection, "MySQLPort", mysqlPort);
sysConfig->setConfig("CrossEngineSupport", "Port", mysqlPort);
} }
catch (...) catch (...)
{} {}

1
storage-manager/include/messageFormat.h Normal file → Executable file
View File

@@ -65,6 +65,7 @@ static const uint32_t SM_MSG_END=0x9d5bc31b;
static const uint32_t SM_HEADER_LEN = sizeof(sm_msg_header); static const uint32_t SM_HEADER_LEN = sizeof(sm_msg_header);
// the unix socket StorageManager is listening on // the unix socket StorageManager is listening on
__attribute__ ((unused))
static const char *socket_name = "\0storagemanager"; static const char *socket_name = "\0storagemanager";
#pragma GCC diagnostic pop #pragma GCC diagnostic pop

View File

@@ -300,7 +300,8 @@ int SessionManager::start()
//read this snippet and keep going //read this snippet and keep going
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
remainingBytes = endOfData; remainingBytes = endOfData;
assert(len == peakLength); if (len != peakLength)
logger->log(LOG_ERR,"Read returned length != peakLength. ( %i != %i )", len, peakLength);
continue; continue;
} }
@@ -327,7 +328,8 @@ int SessionManager::start()
{ {
//logger->log(LOG_DEBUG,"No SM_MSG_START"); //logger->log(LOG_DEBUG,"No SM_MSG_START");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
assert(len == peakLength); if (len != peakLength)
logger->log(LOG_ERR,"Read returned length != peakLength. ( %i != %i )", len, peakLength);
// we know the msg header isn't in position [0, endOfData - i), so throw that out // we know the msg header isn't in position [0, endOfData - i), so throw that out
// and copy [i, endOfData) to the front of the buffer to be // and copy [i, endOfData) to the front of the buffer to be
// checked by the next iteration. // checked by the next iteration.
@@ -351,6 +353,8 @@ int SessionManager::start()
{ {
//logger->log(LOG_DEBUG,"SM_MSG_START data is next message"); //logger->log(LOG_DEBUG,"SM_MSG_START data is next message");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
if (len != peakLength)
logger->log(LOG_ERR,"Read returned length != peakLength. ( %i != %i )", len, peakLength);
} }
//Disable polling on this socket //Disable polling on this socket
fds[socketIncr].events = 0; fds[socketIncr].events = 0;

View File

@@ -24,9 +24,11 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <fcntl.h> #include <fcntl.h>
#include <time.h>
#include "IOCoordinator.h" #include "IOCoordinator.h"
#include "SMFileSystem.h" #include "SMFileSystem.h"
#include "SMDataFile.h"
#include "messageFormat.h" #include "messageFormat.h"
using namespace std; using namespace std;
@@ -80,7 +82,11 @@ void lsOffline(const char *path)
} }
else else
cout.width(15); cout.width(15);
cout << right << _stat.st_size << left << " " << entry << endl;
struct tm *my_tm = localtime(&_stat.st_mtim.tv_sec);
char date[100];
strftime(date, 100, "%b %e %H:%M", my_tm);
cout << right << _stat.st_size << left << " " << date << left << " " << entry << endl;
} }
else else
{ {
@@ -107,6 +113,8 @@ void lsOnline(const char *path)
p = base / entry; p = base / entry;
bool isDir = fs.isDir(p.string().c_str()); bool isDir = fs.isDir(p.string().c_str());
ssize_t size = fs.size(p.string().c_str()); ssize_t size = fs.size(p.string().c_str());
idbdatafile::SMDataFile df(p.string().c_str(),O_RDONLY,1);
time_t mtime = df.mtime();
if (size >= 0) if (size >= 0)
{ {
if (isDir) if (isDir)
@@ -116,7 +124,11 @@ void lsOnline(const char *path)
} }
else else
cout.width(15); cout.width(15);
cout << right << size << left << " " << entry << endl;
struct tm *my_tm = localtime(&mtime);
char date[100];
strftime(date, 100, "%b %e %H:%M", my_tm);
cout << right << size << left << " " << date << left << " " << entry << endl;
} }
else else
{ {

View File

@@ -4,7 +4,7 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
########### next target ############### ########### next target ###############
set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept.cpp regr_r2.cpp corr.cpp regr_sxx.cpp regr_syy.cpp regr_sxy.cpp covar_pop.cpp covar_samp.cpp) set(regr_LIB_SRCS regr_avgx.cpp regr_avgy.cpp regr_count.cpp regr_slope.cpp regr_intercept.cpp regr_r2.cpp corr.cpp regr_sxx.cpp regr_syy.cpp regr_sxy.cpp covar_pop.cpp covar_samp.cpp moda.cpp)
add_definitions(-DMYSQL_DYNAMIC_PLUGIN) add_definitions(-DMYSQL_DYNAMIC_PLUGIN)
@@ -16,7 +16,7 @@ install(TARGETS regr DESTINATION ${ENGINE_LIBDIR} COMPONENT libs)
set(regr_mysql_LIB_SRCS regrmysql.cpp) set(regr_mysql_LIB_SRCS regrmysql.cpp modamysql.cpp)
add_library(regr_mysql SHARED ${regr_mysql_LIB_SRCS}) add_library(regr_mysql SHARED ${regr_mysql_LIB_SRCS})

480
utils/regr/moda.cpp Normal file
View File

@@ -0,0 +1,480 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <sstream>
#include <cstring>
#include <typeinfo>
#include "moda.h"
#include "bytestream.h"
#include "objectreader.h"
using namespace mcsv1sdk;
// This is the standard way to get a UDAF function into the system's
// map of UDAF for lookup
class Add_moda_ToUDAFMap
{
public:
Add_moda_ToUDAFMap()
{
UDAFMap::getMap()["moda"] = new moda();
}
};
static Add_moda_ToUDAFMap addToMap;
// There are a few design options when creating a generic moda function:
// 1) Always use DOUBLE for internal storage
// Pros: can handle data from any native SQL type.
// Cons: If MODA(SUM()) is called, then the LONG DOUBLE returned by SUM will
// be truncated.
// It requires 8 bytes in the hash table and requires streaming 8 bytes
// per entry regardles of how small it could have been.
// 2) Always use LONG DOUBLE for internal storage
// Pros: Solves the problem of MODA(SUM())
// Cons: It requires 16 bytes in the hash table and requires streaming 16 bytes
// per entry regardles of how small it could have been.
// 3) Use the data type of the column for internal storage
// Pros: Can handle MODA(SUM()) because LONG DOUBLE all types are handeled
// Only the data size needed is stored in the hash table and streamed
//
// This class implements option 3 by creating templated classes.
// There are two moda classes, the main one called moda, which is basically
// an adapter (Pattern) to the templated class called Moda_impl_T.
//
// The way the API works, each function class is instantiated exactly once per
// executable and then accessed via a map. This means that the function classes
// could be used by any active query, or more than once by a single query. These
// classes have no data fields for this reason. All data for a specific query is
// maintained by the context object.
//
// Each possible templated instantation is created ate moda creation during startup.
// They are the Moda_impl_T members at the bottom of the moda class definition.
// At runtime getImpl() gets the right one for the datatype involved based on context.
//
// More template magic is done in the ModaData class to create and maintained
// a hash of the correct type.
// getImpl returns the current modaImpl or gets the correct one based on context.
mcsv1_UDAF* moda::getImpl(mcsv1Context* context)
{
ModaData* data = static_cast<ModaData*>(context->getUserData());
if (data->modaImpl)
return data->modaImpl;
switch (context->getResultType())
{
case execplan::CalpontSystemCatalog::TINYINT:
data->modaImpl = &moda_impl_int8;
break;
case execplan::CalpontSystemCatalog::SMALLINT:
data->modaImpl = &moda_impl_int16;
break;
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
data->modaImpl = &moda_impl_int32;
break;
case execplan::CalpontSystemCatalog::BIGINT:
data->modaImpl = &moda_impl_int64;
break;
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
switch (context->getColWidth())
{
case 1:
data->modaImpl = &moda_impl_int8;
break;
case 2:
data->modaImpl = &moda_impl_int16;
break;
case 4:
data->modaImpl = &moda_impl_int32;
break;
default:
data->modaImpl = &moda_impl_int64;
break;
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
data->modaImpl = &moda_impl_uint8;
break;
case execplan::CalpontSystemCatalog::USMALLINT:
data->modaImpl = &moda_impl_uint16;
break;
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
data->modaImpl = &moda_impl_uint32;
break;
case execplan::CalpontSystemCatalog::UBIGINT:
data->modaImpl = &moda_impl_uint64;
break;
case execplan::CalpontSystemCatalog::FLOAT:
data->modaImpl = &moda_impl_float;
break;
case execplan::CalpontSystemCatalog::DOUBLE:
data->modaImpl = &moda_impl_double;
break;
case execplan::CalpontSystemCatalog::LONGDOUBLE:
data->modaImpl = &moda_impl_longdouble;
break;
default:
data->modaImpl = NULL;
}
return data->modaImpl;
}
mcsv1_UDAF::ReturnCode moda::init(mcsv1Context* context,
ColumnDatum* colTypes)
{
if (context->getParameterCount() < 1)
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("moda() with 0 arguments");
return mcsv1_UDAF::ERROR;
}
if (context->getParameterCount() > 1)
{
context->setErrorMessage("moda() with more than 1 argument");
return mcsv1_UDAF::ERROR;
}
if (!(execplan::isNumeric(colTypes[0].dataType)))
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("moda() with non-numeric argument");
return mcsv1_UDAF::ERROR;
}
context->setResultType(colTypes[0].dataType);
if (colTypes[0].dataType == execplan::CalpontSystemCatalog::DECIMAL
|| colTypes[0].dataType == execplan::CalpontSystemCatalog::UDECIMAL)
{
if (colTypes[0].precision < 3)
{
context->setColWidth(1);
}
else if (colTypes[0].precision < 4)
{
context->setColWidth(2);
}
else if (colTypes[0].precision < 9)
{
context->setColWidth(4);
}
else
{
context->setColWidth(8);
}
}
mcsv1_UDAF* impl = getImpl(context);
if (!impl)
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("moda() with non-numeric argument");
return mcsv1_UDAF::ERROR;
}
context->setRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS);
return impl->init(context, colTypes);
}
template<class T>
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::init(mcsv1Context* context,
ColumnDatum* colTypes)
{
context->setScale(context->getScale());
context->setPrecision(19);
return mcsv1_UDAF::SUCCESS;
}
template<class T>
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::reset(mcsv1Context* context)
{
ModaData* data = static_cast<ModaData*>(context->getUserData());
data->fReturnType = context->getResultType();
data->fColWidth = context->getColWidth();
data->clear<T>();
return mcsv1_UDAF::SUCCESS;
}
template<class T>
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
static_any::any& valIn = valsIn[0].columnData;
ModaData* data = static_cast<ModaData*>(context->getUserData());
std::unordered_map<T, uint32_t>* map = data->getMap<T>();
if (valIn.empty())
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
T val = convertAnyTo<T>(valIn);
if (context->getResultType() == execplan::CalpontSystemCatalog::DOUBLE)
{
// For decimal types, we need to move the decimal point.
uint32_t scale = valsIn[0].scale;
if (val != 0 && scale > 0)
{
val /= pow(10.0, (double)scale);
}
}
data->fSum += val;
++data->fCount;
(*map)[val]++;
return mcsv1_UDAF::SUCCESS;
}
template<class T>
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::subEvaluate(mcsv1Context* context, const UserData* userDataIn)
{
if (!userDataIn)
{
return mcsv1_UDAF::SUCCESS;
}
ModaData* outData = static_cast<ModaData*>(context->getUserData());
const ModaData* inData = static_cast<const ModaData*>(userDataIn);
std::unordered_map<T, uint32_t>* outMap = outData->getMap<T>();
std::unordered_map<T, uint32_t>* inMap = inData->getMap<T>();
typename std::unordered_map<T, uint32_t>::const_iterator iter;
for (iter = inMap->begin(); iter != inMap->end(); ++iter)
{
(*outMap)[iter->first] += iter->second;
}
// AVG
outData->fSum += inData->fSum;
outData->fCount += inData->fCount;
return mcsv1_UDAF::SUCCESS;
}
template<class T>
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::evaluate(mcsv1Context* context, static_any::any& valOut)
{
uint64_t maxCnt = 0;
T avg = 0;
T val = 0;
ModaData* data = static_cast<ModaData*>(context->getUserData());
std::unordered_map<T, uint32_t>* map = data->getMap<T>();
if (map->size() == 0)
{
valOut = (T)0;
return mcsv1_UDAF::SUCCESS;
}
avg = data->fCount ? data->fSum / data->fCount : 0;
typename std::unordered_map<T, uint32_t>::iterator iter;
for (iter = map->begin(); iter != map->end(); ++iter)
{
if (iter->second > maxCnt)
{
val = iter->first;
maxCnt = iter->second;
}
else if (iter->second == maxCnt)
{
// Tie breaker: choose the closest to avg. If still tie, choose smallest
if ((std::fabs(val-avg) > std::fabs(iter->first-avg))
|| ((std::fabs(val-avg) == std::fabs(iter->first-avg)) && (std::fabs(val) > std::fabs(iter->first))))
{
val = iter->first;
}
}
}
// If scale is > 0, then the original type was DECIMAL. Set the
// ResultType to DECIMAL so the delivery logic moves the decimal point.
if (context->getScale() > 0)
context->setResultType(execplan::CalpontSystemCatalog::DECIMAL);
valOut = val;
return mcsv1_UDAF::SUCCESS;
}
template<class T>
mcsv1_UDAF::ReturnCode Moda_impl_T<T>::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
static_any::any& valDropped = valsDropped[0].columnData;
ModaData* data = static_cast<ModaData*>(context->getUserData());
std::unordered_map<T, uint32_t>* map = data->getMap<T>();
if (valDropped.empty())
{
return mcsv1_UDAF::SUCCESS; // Ought not happen when UDAF_IGNORE_NULLS is on.
}
T val = convertAnyTo<T>(valDropped);
data->fSum -= val;
--data->fCount;
(*map)[val]--;
return mcsv1_UDAF::SUCCESS;
}
void ModaData::serialize(messageqcpp::ByteStream& bs) const
{
bs << fReturnType;
bs << fSum;
bs << fCount;
bs << fColWidth;
switch ((execplan::CalpontSystemCatalog::ColDataType)fReturnType)
{
case execplan::CalpontSystemCatalog::TINYINT:
serializeMap<int8_t>(bs);
break;
case execplan::CalpontSystemCatalog::SMALLINT:
serializeMap<int16_t>(bs);
break;
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
serializeMap<int32_t>(bs);
break;
case execplan::CalpontSystemCatalog::BIGINT:
serializeMap<int64_t>(bs);
break;
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
switch (fColWidth)
{
case 1:
serializeMap<int8_t>(bs);
break;
case 2:
serializeMap<int16_t>(bs);
break;
case 4:
serializeMap<int32_t>(bs);
break;
default:
serializeMap<int64_t>(bs);
break;
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
serializeMap<uint8_t>(bs);
break;
case execplan::CalpontSystemCatalog::USMALLINT:
serializeMap<uint16_t>(bs);
break;
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
serializeMap<uint32_t>(bs);
break;
case execplan::CalpontSystemCatalog::UBIGINT:
serializeMap<uint64_t>(bs);
break;
case execplan::CalpontSystemCatalog::FLOAT:
serializeMap<float>(bs);
break;
case execplan::CalpontSystemCatalog::DOUBLE:
serializeMap<double>(bs);
break;
case execplan::CalpontSystemCatalog::LONGDOUBLE:
serializeMap<long double>(bs);
break;
default:
throw std::runtime_error("ModaData::serialize with bad data type");
break;
}
}
void ModaData::unserialize(messageqcpp::ByteStream& bs)
{
bs >> fReturnType;
bs >> fSum;
bs >> fCount;
bs >> fColWidth;
switch ((execplan::CalpontSystemCatalog::ColDataType)fReturnType)
{
case execplan::CalpontSystemCatalog::TINYINT:
unserializeMap<int8_t>(bs);
break;
case execplan::CalpontSystemCatalog::SMALLINT:
unserializeMap<int16_t>(bs);
break;
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
unserializeMap<int32_t>(bs);
break;
case execplan::CalpontSystemCatalog::BIGINT:
unserializeMap<int64_t>(bs);
break;
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
switch (fColWidth)
{
case 1:
unserializeMap<int8_t>(bs);
break;
case 2:
unserializeMap<int16_t>(bs);
break;
case 4:
unserializeMap<int32_t>(bs);
break;
default:
unserializeMap<int64_t>(bs);
break;
}
break;
case execplan::CalpontSystemCatalog::UTINYINT:
unserializeMap<uint8_t>(bs);
break;
case execplan::CalpontSystemCatalog::USMALLINT:
unserializeMap<uint16_t>(bs);
break;
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
unserializeMap<uint32_t>(bs);
break;
case execplan::CalpontSystemCatalog::UBIGINT:
unserializeMap<uint64_t>(bs);
break;
case execplan::CalpontSystemCatalog::FLOAT:
unserializeMap<float>(bs);
break;
case execplan::CalpontSystemCatalog::DOUBLE:
unserializeMap<double>(bs);
break;
case execplan::CalpontSystemCatalog::LONGDOUBLE:
unserializeMap<long double>(bs);
break;
default:
throw std::runtime_error("ModaData::unserialize with bad data type");
break;
}
}

236
utils/regr/moda.h Normal file
View File

@@ -0,0 +1,236 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id$
*
* moda.h
***********************************************************************/
/**
* Columnstore interface for the moda User Defined Aggregate
* Functions (UDAF) and User Defined Analytic Functions (UDAnF).
*
* To notify mysqld about the new function:
*
* CREATE AGGREGATE FUNCTION moda returns STRING soname 'libregr_mysql.so';
*
* moda returns the value with the greatest number of occurances in
* the dataset with ties being broken by:
* 1) closest to AVG
* 2) smallest value
*/
#ifndef HEADER_moda
#define HEADER_moda
#include <cstdlib>
#include <string>
#include <vector>
#include <unordered_map>
#include "mcsv1_udaf.h"
#include "calpontsystemcatalog.h"
#include "windowfunctioncolumn.h"
#if defined(_MSC_VER) && defined(xxxRGNODE_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
namespace mcsv1sdk
{
// Override UserData for data storage
struct ModaData : public UserData
{
ModaData() : fMap(NULL),
fReturnType((uint32_t)execplan::CalpontSystemCatalog::UNDEFINED),
fColWidth(0),
modaImpl(NULL)
{};
virtual ~ModaData() {}
virtual void serialize(messageqcpp::ByteStream& bs) const;
virtual void unserialize(messageqcpp::ByteStream& bs);
template<class T>
std::unordered_map<T, uint32_t>* getMap()
{
if (!fMap)
{
// Just in time creation
fMap = new std::unordered_map<T, uint32_t>;
}
return (std::unordered_map<T, uint32_t>*) fMap;
}
// The const version is only called by serialize()
// It shouldn't (and can't) create a new map.
template<class T>
std::unordered_map<T, uint32_t>* getMap() const
{
if (!fMap)
{
throw std::runtime_error("ModaData::serialize with no map");
}
return (std::unordered_map<T, uint32_t>*) fMap;
}
template<class T>
void clear()
{
fSum = 0.0;
fCount = 0;
getMap<T>()->clear();
}
long double fSum;
uint64_t fCount;
void* fMap; // Will be of type unordered_map<>
uint32_t fReturnType;
uint32_t fColWidth;
mcsv1_UDAF* modaImpl; // A pointer to one of the Moda_impl_T concrete classes
private:
// For now, copy construction is unwanted
ModaData(UserData&);
// Templated map streamers
template<class T>
void serializeMap(messageqcpp::ByteStream& bs) const
{
std::unordered_map<T, uint32_t>* map = getMap<T>();
typename std::unordered_map<T, uint32_t>::const_iterator iter;
bs << (uint64_t)map->size();
for (iter = map->begin(); iter != map->end(); ++iter)
{
bs << iter->first;
bs << iter->second;
}
}
template<class T>
void unserializeMap(messageqcpp::ByteStream& bs)
{
uint32_t cnt;
T num;
uint64_t sz;
bs >> sz;
std::unordered_map<T, uint32_t>* map = getMap<T>();
map->clear();
for (uint64_t i = 0; i < sz; ++i)
{
bs >> num;
bs >> cnt;
(*map)[num] = cnt;
}
}
};
template<class T>
class Moda_impl_T : public mcsv1_UDAF
{
public:
// Defaults OK
Moda_impl_T() {};
virtual ~Moda_impl_T() {};
virtual mcsv1_UDAF::ReturnCode init(mcsv1Context* context,
ColumnDatum* colTypes);
virtual mcsv1_UDAF::ReturnCode reset(mcsv1Context* context);
virtual mcsv1_UDAF::ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn);
virtual mcsv1_UDAF::ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn);
virtual mcsv1_UDAF::ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut);
virtual mcsv1_UDAF::ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped);
// Dummy: not used
virtual mcsv1_UDAF::ReturnCode createUserData(UserData*& userData, int32_t& length)
{
return mcsv1_UDAF::SUCCESS;
}
};
// moda returns the modal value of the dataset. If more than one value
// have the same maximum number of occurances, then the one closest to
// AVG wins. If two are the same distance from AVG, then the smaller wins.
class moda : public mcsv1_UDAF
{
public:
// Defaults OK
moda() : mcsv1_UDAF() {};
virtual ~moda() {};
virtual mcsv1_UDAF::ReturnCode init(mcsv1Context* context,
ColumnDatum* colTypes);
virtual ReturnCode reset(mcsv1Context* context)
{
return getImpl(context)->reset(context);
}
virtual mcsv1_UDAF::ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn)
{
return getImpl(context)->nextValue(context, valsIn);
}
virtual mcsv1_UDAF::ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn)
{
return getImpl(context)->subEvaluate(context, valIn);
}
virtual mcsv1_UDAF::ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut)
{
return getImpl(context)->evaluate(context, valOut);
}
virtual mcsv1_UDAF::ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{
return getImpl(context)->dropValue(context, valsDropped);
}
mcsv1_UDAF::ReturnCode createUserData(UserData*& userData, int32_t& length)
{
userData = new ModaData;
length = sizeof(ModaData);
return mcsv1_UDAF::SUCCESS;
}
mcsv1_UDAF* getImpl(mcsv1Context* context);
protected:
Moda_impl_T<int8_t> moda_impl_int8;
Moda_impl_T<int16_t> moda_impl_int16;
Moda_impl_T<int32_t> moda_impl_int32;
Moda_impl_T<int64_t> moda_impl_int64;
Moda_impl_T<uint8_t> moda_impl_uint8;
Moda_impl_T<uint16_t> moda_impl_uint16;
Moda_impl_T<uint32_t> moda_impl_uint32;
Moda_impl_T<uint64_t> moda_impl_uint64;
Moda_impl_T<float> moda_impl_float;
Moda_impl_T<double> moda_impl_double;
Moda_impl_T<long double> moda_impl_longdouble;
};
}; // namespace
#undef EXPORT
#endif // HEADER_mode.h

288
utils/regr/modamysql.cpp Normal file
View File

@@ -0,0 +1,288 @@
#include <my_config.h>
#include <cmath>
#include <iostream>
#include <sstream>
#include <string.h>
#include <tr1/unordered_map>
#include <algorithm>
#include "idb_mysql.h"
namespace
{
inline bool isNumeric(int type, const char* attr)
{
if (type == INT_RESULT || type == REAL_RESULT || type == DECIMAL_RESULT)
{
return true;
}
#if _MSC_VER
if (_strnicmp("NULL", attr, 4) == 0))
#else
if (strncasecmp("NULL", attr, 4) == 0)
#endif
{
return true;
}
return false;
}
struct moda_data
{
long double fSum;
uint64_t fCount;
enum Item_result fReturnType;
std::tr1::unordered_map<int64_t, uint32_t> mapINT;
std::tr1::unordered_map<double, uint32_t> mapREAL;
std::tr1::unordered_map<long double, uint32_t> mapDECIMAL;
void clear()
{
fSum = 0.0;
fCount = 0;
mapINT.clear();
mapREAL.clear();
mapDECIMAL.clear();
}
};
}
extern "C"
{
#ifdef _MSC_VER
__declspec(dllexport)
#endif
my_bool moda_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
struct moda_data* data;
if (args->arg_count != 1)
{
strcpy(message,"moda() requires one argument");
return 1;
}
if (!isNumeric(args->arg_type[0], args->attributes[0]))
{
strcpy(message,"moda() with a non-numeric argument");
return 1;
}
data = new moda_data;
data->fReturnType = args->arg_type[0];
data->fCount = 0;
data->fSum = 0.0;
initid->ptr = (char*)data;
return 0;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void moda_deinit(UDF_INIT* initid)
{
struct moda_data* data = (struct moda_data*)initid->ptr;
data->clear();
delete data;
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void moda_clear(UDF_INIT* initid, char* is_null __attribute__((unused)),
char* message __attribute__((unused)))
{
struct moda_data* data = (struct moda_data*)initid->ptr;
data->clear();
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void moda_add(UDF_INIT* initid,
UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// Test for NULL
if (args->args[0] == 0)
{
return;
}
struct moda_data* data = (struct moda_data*)initid->ptr;
data->fCount++;
switch (args->arg_type[0])
{
case INT_RESULT:
{
int64_t val = *((int64_t*)args->args[0]);
data->fSum += (long double)val;
data->mapINT[val]++;
break;
}
case REAL_RESULT:
{
double val = *((double*)args->args[0]);
data->fSum += val;
data->mapREAL[val]++;
break;
}
case DECIMAL_RESULT:
case STRING_RESULT:
{
long double val = strtold(args->args[0], 0);
data->fSum += val;
data->mapDECIMAL[val]++;
break;
}
default:
break;
}
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
void moda_remove(UDF_INIT* initid, UDF_ARGS* args,
char* is_null,
char* message __attribute__((unused)))
{
// Test for NULL
if (args->args[0] == 0)
{
return;
}
struct moda_data* data = (struct moda_data*)initid->ptr;
data->fCount--;
switch (args->arg_type[0])
{
case INT_RESULT:
{
int64_t val = *((int64_t*)args->args[0]);
data->fSum -= (long double)val;
data->mapINT[val]--;
break;
}
case REAL_RESULT:
{
double val = *((double*)args->args[0]);
data->fSum -= val;
data->mapREAL[val]--;
break;
}
case DECIMAL_RESULT:
case STRING_RESULT:
{
long double val = strtold(args->args[0], 0);
data->fSum -= val;
data->mapDECIMAL[val]--;
break;
}
default:
break;
}
}
#ifdef _MSC_VER
__declspec(dllexport)
#endif
char* moda(UDF_INIT* initid, UDF_ARGS* args __attribute__((unused)),
char* is_null, char* error __attribute__((unused)))
{
struct moda_data* data = (struct moda_data*)initid->ptr;
uint32_t maxCnt = 0.0;
switch (args->arg_type[0])
{
case INT_RESULT:
{
typename std::tr1::unordered_map<int64_t, uint32_t>::iterator iter;
int64_t avg = (int64_t)data->fCount ? data->fSum / data->fCount : 0;
int64_t val = 0.0;
for (iter = data->mapINT.begin(); iter != data->mapINT.end(); ++iter)
{
if (iter->second > maxCnt)
{
val = iter->first;
maxCnt = iter->second;
}
else if (iter->second == maxCnt)
{
// Tie breaker: choose the closest to avg. If still tie, choose smallest
if ((abs(val-avg) > abs(iter->first-avg))
|| ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first))))
{
val = iter->first;
}
}
}
std::ostringstream oss;
oss << val;
return const_cast<char*>(oss.str().c_str());
break;
}
case REAL_RESULT:
{
typename std::tr1::unordered_map<double, uint32_t>::iterator iter;
double avg = data->fCount ? data->fSum / data->fCount : 0;
double val = 0.0;
for (iter = data->mapREAL.begin(); iter != data->mapREAL.end(); ++iter)
{
if (iter->second > maxCnt)
{
val = iter->first;
maxCnt = iter->second;
}
else if (iter->second == maxCnt)
{
// Tie breaker: choose the closest to avg. If still tie, choose smallest
if ((abs(val-avg) > abs(iter->first-avg))
|| ((abs(val-avg) == abs(iter->first-avg)) && (abs(val) > abs(iter->first))))
{
val = iter->first;
}
}
}
std::ostringstream oss;
oss << val;
return const_cast<char*>(oss.str().c_str());
break;
}
case DECIMAL_RESULT:
case STRING_RESULT:
{
typename std::tr1::unordered_map<long double, uint32_t>::iterator iter;
long double avg = data->fCount ? data->fSum / data->fCount : 0;
long double val = 0.0;
for (iter = data->mapDECIMAL.begin(); iter != data->mapDECIMAL.end(); ++iter)
{
if (iter->second > maxCnt)
{
val = iter->first;
maxCnt = iter->second;
}
else if (iter->second == maxCnt)
{
long double thisVal = iter->first;
// Tie breaker: choose the closest to avg. If still tie, choose smallest
if ((abs(val-avg) > abs(thisVal-avg))
|| ((abs(val-avg) == abs(thisVal-avg)) && (abs(val) > abs(thisVal))))
{
val = thisVal;
}
}
}
std::ostringstream oss;
oss << val;
return const_cast<char*>(oss.str().c_str());
break;
}
default:
break;
}
return NULL;
}
} // Extern "C"

2
utils/rowgroup/rowaggregation.cpp Normal file → Executable file
View File

@@ -2671,7 +2671,7 @@ void RowAggregationUM::SetUDAFValue(static_any::any& valOut, int64_t colOut)
case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::INT:
if (valOut.compatible(uintTypeId)) if (valOut.compatible(intTypeId))
{ {
intOut = valOut.cast<int>(); intOut = valOut.cast<int>();
bSetSuccess = true; bSetSuccess = true;

6
utils/udfsdk/mcsv1_udaf.cpp Normal file → Executable file
View File

@@ -218,6 +218,7 @@ void mcsv1Context::serialize(messageqcpp::ByteStream& b) const
// Dont send context flags, These are set for each call // Dont send context flags, These are set for each call
b << fUserDataSize; b << fUserDataSize;
b << (uint32_t)fResultType; b << (uint32_t)fResultType;
b << fColWidth;
b << fResultscale; b << fResultscale;
b << fResultPrecision; b << fResultPrecision;
b << errorMsg; b << errorMsg;
@@ -228,6 +229,7 @@ void mcsv1Context::serialize(messageqcpp::ByteStream& b) const
b << fStartConstant; b << fStartConstant;
b << fEndConstant; b << fEndConstant;
b << fParamCount; b << fParamCount;
b << (uint32_t)mariadbReturnType;
} }
void mcsv1Context::unserialize(messageqcpp::ByteStream& b) void mcsv1Context::unserialize(messageqcpp::ByteStream& b)
@@ -239,6 +241,7 @@ void mcsv1Context::unserialize(messageqcpp::ByteStream& b)
uint32_t iResultType; uint32_t iResultType;
b >> iResultType; b >> iResultType;
fResultType = (execplan::CalpontSystemCatalog::ColDataType)iResultType; fResultType = (execplan::CalpontSystemCatalog::ColDataType)iResultType;
b >> fColWidth;
b >> fResultscale; b >> fResultscale;
b >> fResultPrecision; b >> fResultPrecision;
b >> errorMsg; b >> errorMsg;
@@ -250,6 +253,9 @@ void mcsv1Context::unserialize(messageqcpp::ByteStream& b)
b >> fStartConstant; b >> fStartConstant;
b >> fEndConstant; b >> fEndConstant;
b >> fParamCount; b >> fParamCount;
uint32_t mrt;
b >> mrt;
mariadbReturnType = (enum_mariadb_return_type)mrt;
} }
void UserData::serialize(messageqcpp::ByteStream& bs) const void UserData::serialize(messageqcpp::ByteStream& bs) const

27
utils/udfsdk/mcsv1_udaf.h Normal file → Executable file
View File

@@ -86,6 +86,14 @@
namespace mcsv1sdk namespace mcsv1sdk
{ {
// The return type of the CREATE AGGREGATE statement
enum enum_mariadb_return_type {
MYSQL_TYPE_DOUBLE = 5,
MYSQL_TYPE_LONGLONG = 8,
MYSQL_TYPE_VARCHAR=15,
MYSQL_TYPE_NEWDECIMAL = 246
};
/** /**
* A map from name to function object. * A map from name to function object.
* *
@@ -303,7 +311,7 @@ public:
EXPORT int32_t getColWidth(); EXPORT int32_t getColWidth();
// For non-numric return types, set the return column width. This defaults // For non-numric return types, set the return column width. This defaults
// to the the length of the input. // to a length determined by the input datatype.
// valid in init() // valid in init()
EXPORT bool setColWidth(int32_t colWidth); EXPORT bool setColWidth(int32_t colWidth);
@@ -356,6 +364,9 @@ public:
// Get the name of the function // Get the name of the function
EXPORT const std::string& getName() const; EXPORT const std::string& getName() const;
// Get the return type as set by CREATE AGGREGATE FUNCTION
EXPORT enum_mariadb_return_type getMariaDBReturnType() const;
EXPORT mcsv1Context& operator=(const mcsv1Context& rhs); EXPORT mcsv1Context& operator=(const mcsv1Context& rhs);
EXPORT mcsv1Context& copy(const mcsv1Context& rhs); EXPORT mcsv1Context& copy(const mcsv1Context& rhs);
@@ -380,6 +391,7 @@ private:
mcsv1sdk::mcsv1_UDAF* func; mcsv1sdk::mcsv1_UDAF* func;
int32_t fParamCount; int32_t fParamCount;
std::vector<uint32_t> paramKeys; std::vector<uint32_t> paramKeys;
enum_mariadb_return_type mariadbReturnType;
public: public:
// For use by the framework // For use by the framework
@@ -403,6 +415,7 @@ public:
EXPORT boost::shared_ptr<UserData> getUserDataSP(); EXPORT boost::shared_ptr<UserData> getUserDataSP();
EXPORT void setParamCount(int32_t paramCount); EXPORT void setParamCount(int32_t paramCount);
std::vector<uint32_t>* getParamKeys(); std::vector<uint32_t>* getParamKeys();
EXPORT void setMariaDBReturnType(enum_mariadb_return_type rt);
}; };
// Since aggregate functions can operate on any data type, we use the following structure // Since aggregate functions can operate on any data type, we use the following structure
@@ -956,6 +969,16 @@ inline std::vector<uint32_t>* mcsv1Context::getParamKeys()
return &paramKeys; return &paramKeys;
} }
inline enum_mariadb_return_type mcsv1Context::getMariaDBReturnType() const
{
return mariadbReturnType;
}
inline void mcsv1Context::setMariaDBReturnType(enum_mariadb_return_type rt)
{
mariadbReturnType = rt;
}
inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::dropValue(mcsv1Context* context, ColumnDatum* valsDropped) inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::dropValue(mcsv1Context* context, ColumnDatum* valsDropped)
{ {
return NOT_IMPLEMENTED; return NOT_IMPLEMENTED;
@@ -968,8 +991,6 @@ inline mcsv1_UDAF::ReturnCode mcsv1_UDAF::createUserData(UserData*& userData, in
return SUCCESS; return SUCCESS;
} }
// Handy helper functions // Handy helper functions
template<typename T> template<typename T>
inline T mcsv1_UDAF::convertAnyTo(static_any::any& valIn) inline T mcsv1_UDAF::convertAnyTo(static_any::any& valIn)

View File

@@ -266,6 +266,7 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
case CalpontSystemCatalog::BIGINT: case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL: case CalpontSystemCatalog::UDECIMAL:
case CalpontSystemCatalog::TIME:
{ {
Compare* c = new IntCompare(*i); Compare* c = new IntCompare(*i);
fCompares.push_back(c); fCompares.push_back(c);
@@ -317,7 +318,6 @@ void CompareRule::compileRules(const std::vector<IdbSortSpec>& spec, const rowgr
case CalpontSystemCatalog::DATE: case CalpontSystemCatalog::DATE:
case CalpontSystemCatalog::DATETIME: case CalpontSystemCatalog::DATETIME:
case CalpontSystemCatalog::TIMESTAMP: case CalpontSystemCatalog::TIMESTAMP:
case CalpontSystemCatalog::TIME:
{ {
Compare* c = new UintCompare(*i); Compare* c = new UintCompare(*i);
fCompares.push_back(c); fCompares.push_back(c);