1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Merge pull request #742 from mariadb-corporation/develop-merge-up-20190425

Merge develop-1.2 into develop
This commit is contained in:
Roman Nozdrin
2019-04-25 12:56:00 +03:00
committed by GitHub
37 changed files with 508 additions and 322 deletions

View File

@@ -1,4 +1,4 @@
COLUMNSTORE_VERSION_MAJOR=1
COLUMNSTORE_VERSION_MINOR=3
COLUMNSTORE_VERSION_MINOR=4
COLUMNSTORE_VERSION_PATCH=0
COLUMNSTORE_VERSION_RELEASE=1

View File

@@ -2069,7 +2069,8 @@ void AlterTableProcessor::tableComment(uint32_t sessionID, execplan::CalpontSyst
}
else
{
throw std::runtime_error("Invalid table comment");
// Generic table comment, we don't need to do anything
return;
}
// Get the OID for autoinc (if exists)

View File

@@ -36,6 +36,7 @@
#endif
#include <cstring>
#include <boost/regex.hpp>
#include <boost/algorithm/string/trim.hpp>
#include "expressionparser.h"
#include "returnedcolumn.h"
@@ -456,16 +457,20 @@ inline bool PredicateOperator::getBoolVal(rowgroup::Row& row, bool& isNull, Retu
return !ret;
}
// MCOL-1559
std::string val1 = lop->getStrVal(row, isNull);
if (isNull)
return false;
const std::string& val1 = lop->getStrVal(row, isNull);
std::string val2 = rop->getStrVal(row, isNull);
if (isNull)
return false;
return strCompare(val1, rop->getStrVal(row, isNull)) && !isNull;
}
boost::trim_right_if(val1, boost::is_any_of(" "));
boost::trim_right_if(val2, boost::is_any_of(" "));
return strCompare(val1, val2);
}
//FIXME: ???
case execplan::CalpontSystemCatalog::VARBINARY:

View File

@@ -1636,7 +1636,6 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
string constval(cc->constval());
CalpontSystemCatalog::OID dictOid = 0;
CalpontSystemCatalog::ColType ct = sc->colType();
const PseudoColumn* pc = dynamic_cast<const PseudoColumn*>(sc);

View File

@@ -749,6 +749,7 @@ bool LBIDList::CasualPartitionPredicate(const int64_t Min,
int64_t tMax = Max;
dataconvert::DataConvert::trimWhitespace(tMin);
dataconvert::DataConvert::trimWhitespace(tMax);
dataconvert::DataConvert::trimWhitespace(value);
scan = compareVal(order_swap(tMin), order_swap(tMax), order_swap(value),
op, lcf);

View File

@@ -98,4 +98,23 @@ BEGIN
SELECT CONCAT((SELECT SUM(data_size) FROM information_schema.columnstore_extents ce left join information_schema.columnstore_columns cc on ce.object_id = cc.object_id where compression_type='Snappy') / (SELECT SUM(compressed_data_size) FROM information_schema.columnstore_files WHERE compressed_data_size IS NOT NULL), ':1') COMPRESSION_RATIO;
END //
create procedure columnstore_upgrade()
`columnstore_upgrade`: BEGIN
DECLARE done INTEGER DEFAULT 0;
DECLARE schema_table VARCHAR(100) DEFAULT "";
DECLARE table_list CURSOR FOR select concat('`', table_schema,'`.`',table_name,'`') from information_schema.tables where engine='columnstore';
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
OPEN table_list;
tlist: LOOP
FETCH table_list INTO schema_table;
IF done = 1 THEN LEAVE tlist;
END IF;
SET @sql_query = concat('ALTER TABLE ', schema_table, ' COMMENT=\'\'');
PREPARE stmt FROM @sql_query;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END LOOP;
END //
delimiter ;
DELIMITER ;

View File

@@ -2095,8 +2095,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
if ( schemaSyncOnly && isCreate)
return rc;
//this is replcated DDL, treat it just like SSO
if (thd->slave_thread)
if (thd->slave_thread && !ci.replicationEnabled)
return rc;
//@bug 5660. Error out REAL DDL/DML on slave node.
@@ -2294,8 +2293,7 @@ int ha_calpont_impl_delete_table_(const char* db, const char* name, cal_connecti
return 0;
}
//this is replcated DDL, treat it just like SSO
if (thd->slave_thread)
if (thd->slave_thread && !ci.replicationEnabled)
return 0;
//@bug 5660. Error out REAL DDL/DML on slave node.
@@ -2434,8 +2432,7 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti
pair<string, string> toPair;
string stmt;
//this is replicated DDL, treat it just like SSO
if (thd->slave_thread)
if (thd->slave_thread && !ci.replicationEnabled)
return 0;
//@bug 5660. Error out REAL DDL/DML on slave node.

View File

@@ -2080,7 +2080,8 @@ int ha_calpont_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connectio
thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE )
return rc;
if (thd->slave_thread) return 0;
if (thd->slave_thread && !ci.replicationEnabled)
return 0;
std::string command("COMMIT");
#ifdef INFINIDB_DEBUG

View File

@@ -7140,7 +7140,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
// for subquery, order+limit by will be supported in infinidb. build order by columns
// @todo union order by and limit support
if (gwi.hasWindowFunc || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT)
if (gwi.hasWindowFunc
|| gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT
|| ( isUnion && ordercol ))
{
for (; ordercol; ordercol = ordercol->next)
{

View File

@@ -1012,7 +1012,6 @@ uint32_t doUpdateDelete(THD* thd)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
//@bug 5660. Error out DDL/DML on slave node, or on local query node
if (ci->isSlaveNode && !thd->slave_thread)
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_DML_DDL_SLAVE);
@@ -1034,7 +1033,14 @@ uint32_t doUpdateDelete(THD* thd)
// stats start
ci->stats.reset();
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@@ -2129,8 +2135,9 @@ int ha_calpont_impl_rnd_init(TABLE* table)
// prevent "create table as select" from running on slave
thd->infinidb_vtable.hasInfiniDBTable = true;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@@ -2183,8 +2190,6 @@ int ha_calpont_impl_rnd_init(TABLE* table)
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
// MySQL sometimes calls rnd_init multiple times, plan should only be
@@ -2296,7 +2301,14 @@ int ha_calpont_impl_rnd_init(TABLE* table)
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@@ -2661,8 +2673,9 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
{
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@@ -2673,7 +2686,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
@@ -2702,8 +2714,6 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
@@ -2767,8 +2777,17 @@ int ha_calpont_impl_rnd_end(TABLE* table)
int rc = 0;
THD* thd = current_thd;
cal_connection_info* ci = NULL;
bool replicationEnabled = false;
if (thd->slave_thread && (
if (thd->infinidb_vtable.cal_conn_info)
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
if (ci && ci->replicationEnabled)
{
replicationEnabled = true;
}
if (thd->slave_thread && !replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@@ -2783,7 +2802,6 @@ int ha_calpont_impl_rnd_end(TABLE* table)
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
{
thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE; // flip back to normal state
@@ -3042,9 +3060,8 @@ int ha_calpont_impl_write_row(uchar* buf, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread) return 0;
if (thd->slave_thread && !ci->replicationEnabled)
return 0;
if (ci->alterTableState > 0) return 0;
@@ -3129,7 +3146,8 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE)
thd->infinidb_vtable.isInfiniDBDML = true;
if (thd->slave_thread) return;
if (thd->slave_thread && !ci->replicationEnabled)
return;
//@bug 5660. Error out DDL/DML on slave node, or on local query node
if (ci->isSlaveNode && thd->infinidb_vtable.vtable_state != THD::INFINIDB_ALTER_VTABLE)
@@ -3568,7 +3586,14 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
// query stats. only collect execution time and rows inserted for insert/load_data_infile
ci->stats.reset();
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@@ -3654,7 +3679,8 @@ int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread) return 0;
if (thd->slave_thread && !ci->replicationEnabled)
return 0;
int rc = 0;
@@ -4317,7 +4343,14 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
@@ -4736,19 +4769,6 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
{
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
@@ -4774,6 +4794,17 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
@@ -4839,7 +4870,19 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
THD* thd = current_thd;
cal_connection_info* ci = NULL;
if (thd->slave_thread && (
thd->infinidb_vtable.isNewQuery = true;
thd->infinidb_vtable.isUnion = false;
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (!ci)
{
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
}
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
@@ -4850,12 +4893,6 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
thd->infinidb_vtable.isNewQuery = true;
thd->infinidb_vtable.isUnion = false;
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
{

View File

@@ -251,10 +251,18 @@ struct cal_connection_info
useXbit(false),
utf8(false),
useCpimport(1),
delimiter('\7')
delimiter('\7'),
replicationEnabled(false)
{
// check if this is a slave mysql daemon
isSlaveNode = checkSlave();
std::string option = config::Config::makeConfig()->getConfig("SystemConfig", "ReplicationEnabled");
if (!option.compare("Y"))
{
replicationEnabled = true;
}
}
static bool checkSlave()
@@ -319,6 +327,7 @@ struct cal_connection_info
char delimiter;
char enclosed_by;
std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes;
bool replicationEnabled;
// MCOL-1101 remove compilation unit variable rmParms
std::vector <execplan::RMParam> rmParms;
};

View File

@@ -30,6 +30,7 @@
#include <unistd.h>
#include <algorithm>
#include <boost/algorithm/string/trim.hpp>
#include "bpp.h"
#include "primitiveserver.h"
@@ -93,6 +94,7 @@ void DictStep::createCommand(ByteStream& bs)
for (uint32_t i = 0; i < filterCount; i++)
{
bs >> strTmp;
boost::trim_right_if(strTmp, boost::is_any_of(" "));
//cout << " " << strTmp << endl;
eqFilter->insert(strTmp);
}

View File

@@ -28,6 +28,7 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <stdexcept>
#include <boost/algorithm/string/trim.hpp>
//#define NDEBUG
#include <cassert>
#include <boost/thread.hpp>
@@ -1804,6 +1805,7 @@ private:
for (i = 0; i < count; i++)
{
*bs >> str;
boost::trim_right_if(str, boost::is_any_of(" "));
filter->insert(str);
}

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the corr function
*
*
* CREATE AGGREGATE FUNCTION corr returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION corr returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_corr

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the covar_pop function
*
*
* CREATE AGGREGATE FUNCTION covar_pop returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION covar_pop returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_covar_pop

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the covar_samp function
*
*
* CREATE AGGREGATE FUNCTION covar_samp returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION covar_samp returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_covar_samp

View File

@@ -63,13 +63,6 @@ mcsv1_UDAF::ReturnCode regr_avgx::init(mcsv1Context* context,
context->setErrorMessage("regr_avgx() with a non-numeric x argument");
return mcsv1_UDAF::ERROR;
}
if (!(isNumeric(colTypes[1].dataType)))
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("regr_avgx() with a non-numeric independant (second) argument");
return mcsv1_UDAF::ERROR;
}
context->setUserDataSize(sizeof(regr_avgx_data));
context->setResultType(CalpontSystemCatalog::DOUBLE);

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_avgx function
*
*
* CREATE AGGREGATE FUNCTION regr_avgx returns REAL soname
* 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_avgx returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_avgx

View File

@@ -60,14 +60,7 @@ mcsv1_UDAF::ReturnCode regr_avgy::init(mcsv1Context* context,
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("regr_avgy() with a non-numeric x argument");
return mcsv1_UDAF::ERROR;
}
if (!(isNumeric(colTypes[0].dataType)))
{
// The error message will be prepended with
// "The storage engine for the table doesn't support "
context->setErrorMessage("regr_avgy() with a non-numeric dependant (first) argument");
context->setErrorMessage("regr_avgy() with a non-numeric y argument");
return mcsv1_UDAF::ERROR;
}

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_avgy function
*
*
* CREATE AGGREGATE FUNCTION regr_avgy returns REAL soname
* 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_avgy returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_avgy

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_count function
*
*
* CREATE AGGREGATE FUNCTION regr_count returns INTEGER
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_count returns INTEGER soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_count

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_intercept function
*
*
* CREATE AGGREGATE FUNCTION regr_intercept returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_intercept returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_intercept

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_r2 function
*
*
* CREATE AGGREGATE FUNCTION regr_r2 returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_r2 returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_r2

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_slope function
*
*
* CREATE AGGREGATE FUNCTION regr_slope returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_slope returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_slope

View File

@@ -132,7 +132,7 @@ mcsv1_UDAF::ReturnCode regr_sxx::evaluate(mcsv1Context* context, static_any::any
long double sumx2 = data->sumx2;
long double var_popx = (sumx2 - (sumx * sumx / N)) / N;
valOut = static_cast<double>(data->cnt * var_popx);
valOut = static_cast<double>(N * var_popx);
}
return mcsv1_UDAF::SUCCESS;
}

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_sxx function
*
*
* CREATE AGGREGATE FUNCTION regr_sxx returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_sxx returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_sxx

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_sxy function
*
*
* CREATE AGGREGATE FUNCTION regr_sxy returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_sxy returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_sxy

View File

@@ -132,7 +132,7 @@ mcsv1_UDAF::ReturnCode regr_syy::evaluate(mcsv1Context* context, static_any::any
long double sumy2 = data->sumy2;
long double var_popy = (sumy2 - (sumy * sumy / N)) / N;
valOut = static_cast<double>(data->cnt * var_popy);
valOut = static_cast<double>(N * var_popy);
}
return mcsv1_UDAF::SUCCESS;
}

View File

@@ -25,8 +25,7 @@
* Columnstore interface for for the regr_syy function
*
*
* CREATE AGGREGATE FUNCTION regr_syy returns REAL
* soname 'libregr_mysql.so';
* CREATE AGGREGATE FUNCTION regr_syy returns REAL soname 'libregr_mysql.so';
*
*/
#ifndef HEADER_regr_syy

View File

@@ -147,7 +147,7 @@ extern "C"
*/
struct regr_avgx_data
{
double sumx;
long double sumx;
int64_t cnt;
};
@@ -159,21 +159,29 @@ extern "C"
struct regr_avgx_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_avgx() requires two arguments");
return 1;
strcpy(message,"regr_avgx() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[1], args->attributes[1])))
{
strcpy(message,"regr_avgx() with a non-numeric independant (second) argument");
return 1;
}
if (args->arg_type[1] == DECIMAL_RESULT && initid->decimals != DECIMAL_NOT_SPECIFIED)
{
initid->decimals += 4;
}
else
{
initid->decimals = DECIMAL_NOT_SPECIFIED;
}
if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data))))
if (!(data = (struct regr_avgx_data*) malloc(sizeof(struct regr_avgx_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->sumx = 0;
data->sumx = 0;
data->cnt = 0;
initid->ptr = (char*)data;
@@ -226,7 +234,17 @@ extern "C"
char* is_null, char* error __attribute__((unused)))
{
struct regr_avgx_data* data = (struct regr_avgx_data*)initid->ptr;
return data->sumx / data->cnt;
double valOut = 0;
if (data->cnt > 0)
{
valOut = static_cast<double>(data->sumx / data->cnt);
}
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -236,8 +254,8 @@ extern "C"
*/
struct regr_avgy_data
{
double sumy;
int64_t cnt;
long double sumy;
int64_t cnt;
};
#ifdef _MSC_VER
@@ -248,8 +266,8 @@ extern "C"
struct regr_avgy_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_avgy() requires two arguments");
return 1;
strcpy(message,"regr_avgy() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0])))
{
@@ -257,10 +275,19 @@ extern "C"
return 1;
}
if (!(data = (struct regr_avgy_data*) malloc(sizeof(struct regr_avgy_data))))
if (args->arg_type[0] == DECIMAL_RESULT && initid->decimals != DECIMAL_NOT_SPECIFIED)
{
initid->decimals += 4;
}
else
{
initid->decimals = DECIMAL_NOT_SPECIFIED;
}
if (!(data = (struct regr_avgy_data*) malloc(sizeof(struct regr_avgy_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->sumy = 0;
data->cnt = 0;
@@ -315,7 +342,16 @@ extern "C"
char* is_null, char* error __attribute__((unused)))
{
struct regr_avgy_data* data = (struct regr_avgy_data*)initid->ptr;
return data->sumy / data->cnt;
double valOut = 0;
if (data->cnt > 0)
{
valOut = static_cast<double>(data->sumy / data->cnt);
}
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -336,14 +372,14 @@ extern "C"
struct regr_count_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_count() requires two arguments");
return 1;
strcpy(message,"regr_count() requires two arguments");
return 1;
}
if (!(data = (struct regr_count_data*) malloc(sizeof(struct regr_count_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
@@ -405,10 +441,10 @@ extern "C"
struct regr_slope_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumxy; // sum of (x*y)
long double sumx;
long double sumx2; // sum of (x squared)
long double sumy;
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -419,8 +455,8 @@ extern "C"
struct regr_slope_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_slope() requires two arguments");
return 1;
strcpy(message,"regr_slope() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -428,10 +464,12 @@ extern "C"
return 1;
}
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct regr_slope_data*) malloc(sizeof(struct regr_slope_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -497,20 +535,23 @@ extern "C"
{
struct regr_slope_data* data = (struct regr_slope_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
*is_null = 1;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumxy = data->sumxy;
double variance = (N * sumx2) - (sumx * sumx);
if (variance)
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumx2 = data->sumx2;
long double sumxy = data->sumxy;
long double covar_pop = N * sumxy - sumx * sumy;
long double var_pop = N * sumx2 - sumx * sumx;
if (var_pop != 0)
{
return ((N * sumxy) - (sumx * sumy)) / variance;
valOut = static_cast<double>(covar_pop / var_pop);
*is_null = 0;
}
}
*is_null = 1;
return 0;
return valOut;
}
//=======================================================================
@@ -521,10 +562,10 @@ extern "C"
struct regr_intercept_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumxy; // sum of (x*y)
long double sumx;
long double sumx2; // sum of (x squared)
long double sumy;
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -535,8 +576,8 @@ extern "C"
struct regr_intercept_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_intercept() requires two arguments");
return 1;
strcpy(message,"regr_intercept() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -544,10 +585,11 @@ extern "C"
return 1;
}
if (!(data = (struct regr_intercept_data*) malloc(sizeof(struct regr_intercept_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct regr_intercept_data*) malloc(sizeof(struct regr_intercept_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -613,22 +655,23 @@ extern "C"
{
struct regr_intercept_data* data = (struct regr_intercept_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
*is_null = 1;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumxy = data->sumxy;
double slope = 0;
double variance = (N * sumx2) - (sumx * sumx);
if (variance)
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumx2 = data->sumx2;
long double sumxy = data->sumxy;
long double numerator = sumy * sumx2 - sumx * sumxy;
long double var_pop = (N * sumx2) - (sumx * sumx);
if (var_pop != 0)
{
slope = ((N * sumxy) - (sumx * sumy)) / variance;
valOut = static_cast<double>(numerator / var_pop);
*is_null = 0;
}
return (sumy - (slope * sumx)) / N;
}
*is_null = 1;
return 0;
return valOut;
}
//=======================================================================
@@ -639,11 +682,11 @@ extern "C"
struct regr_r2_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumy2; // sum of (y squared)
double sumxy; // sum of (x*y)
long double sumx;
long double sumx2; // sum of (x squared)
long double sumy;
long double sumy2; // sum of (y squared)
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -654,8 +697,8 @@ extern "C"
struct regr_r2_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_r2() requires two arguments");
return 1;
strcpy(message,"regr_r2() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -663,10 +706,12 @@ extern "C"
return 1;
}
if (!(data = (struct regr_r2_data*) malloc(sizeof(struct regr_r2_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct regr_r2_data*) malloc(sizeof(struct regr_r2_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -735,34 +780,38 @@ extern "C"
{
struct regr_r2_data* data = (struct regr_r2_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumy2 = data->sumy2;
double sumxy = data->sumxy;
double var_popx = (sumx2 - (sumx * sumx / N)) / N;
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumx2 = data->sumx2;
long double sumy2 = data->sumy2;
long double sumxy = data->sumxy;
long double var_popx = (sumx2 - (sumx * sumx / N)) / N;
if (var_popx == 0)
{
// When var_popx is 0, NULL is the result.
*is_null = 1;
return 0;
}
double var_popy = (sumy2 - (sumy * sumy / N)) / N;
long double var_popy = (sumy2 - (sumy * sumy / N)) / N;
if (var_popy == 0)
{
// When var_popy is 0, 1 is the result
return 1;
}
double std_popx = sqrt(var_popx);
double std_popy = sqrt(var_popy);
double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
double corr = covar_pop / (std_popy * std_popx);
return corr * corr;
long double std_popx = sqrt(var_popx);
long double std_popy = sqrt(var_popy);
long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
long double corr = covar_pop / (std_popy * std_popx);
valOut = static_cast<double>(corr * corr);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -773,11 +822,11 @@ extern "C"
struct corr_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
double sumy;
double sumy2; // sum of (y squared)
double sumxy; // sum of (x*y)
long double sumx;
long double sumx2; // sum of (x squared)
long double sumy;
long double sumy2; // sum of (y squared)
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -788,8 +837,8 @@ extern "C"
struct corr_data* data;
if (args->arg_count != 2)
{
strcpy(message,"corr() requires two arguments");
return 1;
strcpy(message,"corr() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -797,10 +846,12 @@ extern "C"
return 1;
}
if (!(data = (struct corr_data*) malloc(sizeof(struct corr_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct corr_data*) malloc(sizeof(struct corr_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -869,34 +920,38 @@ extern "C"
{
struct corr_data* data = (struct corr_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumx2 = data->sumx2;
double sumy2 = data->sumy2;
double sumxy = data->sumxy;
double var_popx = (sumx2 - (sumx * sumx / N)) / N;
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumx2 = data->sumx2;
long double sumy2 = data->sumy2;
long double sumxy = data->sumxy;
long double var_popx = (sumx2 - (sumx * sumx / N)) / N;
if (var_popx == 0)
{
// When var_popx is 0, NULL is the result.
*is_null = 1;
return 0;
}
double var_popy = (sumy2 - (sumy * sumy / N)) / N;
long double var_popy = (sumy2 - (sumy * sumy / N)) / N;
if (var_popy == 0)
{
// When var_popy is 0, 1 is the result
return 1;
}
double std_popx = sqrt(var_popx);
double std_popy = sqrt(var_popy);
double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
double corr = covar_pop / (std_popy * std_popx);
return corr;
long double std_popx = sqrt(var_popx);
long double std_popy = sqrt(var_popy);
long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
long double corr = covar_pop / (std_popy * std_popx);
return static_cast<double>(corr);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -907,8 +962,8 @@ extern "C"
struct regr_sxx_data
{
int64_t cnt;
double sumx;
double sumx2; // sum of (x squared)
long double sumx;
long double sumx2; // sum of (x squared)
};
#ifdef _MSC_VER
@@ -919,8 +974,8 @@ extern "C"
struct regr_sxx_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_sxx() requires two arguments");
return 1;
strcpy(message,"regr_sxx() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -928,10 +983,12 @@ extern "C"
return 1;
}
if (!(data = (struct regr_sxx_data*) malloc(sizeof(struct regr_sxx_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct regr_sxx_data*) malloc(sizeof(struct regr_sxx_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -990,15 +1047,19 @@ extern "C"
{
struct regr_sxx_data* data = (struct regr_sxx_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumx = data->sumx;
double sumx2 = data->sumx2;
double var_popx = (sumx2 - (sumx * sumx / N)) / N;
return data->cnt * var_popx;
long double sumx = data->sumx;
long double sumx2 = data->sumx2;
long double var_popx = (sumx2 - (sumx * sumx / N)) / N;
valOut = static_cast<double>(N * var_popx);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -1008,8 +1069,8 @@ extern "C"
struct regr_syy_data
{
int64_t cnt;
double sumy;
double sumy2; // sum of (y squared)
long double sumy;
long double sumy2; // sum of (y squared)
};
#ifdef _MSC_VER
@@ -1020,8 +1081,8 @@ extern "C"
struct regr_syy_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_syy() requires two arguments");
return 1;
strcpy(message,"regr_syy() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0])))
{
@@ -1029,10 +1090,12 @@ extern "C"
return 1;
}
if (!(data = (struct regr_syy_data*) malloc(sizeof(struct regr_syy_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct regr_syy_data*) malloc(sizeof(struct regr_syy_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumy = 0.0;
@@ -1091,15 +1154,19 @@ extern "C"
{
struct regr_syy_data* data = (struct regr_syy_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumy = data->sumy;
double sumy2 = data->sumy2;
double var_popy = (sumy2 - (sumy * sumy / N)) / N;
return data->cnt * var_popy;
long double sumy = data->sumy;
long double sumy2 = data->sumy2;
long double var_popy = (sumy2 - (sumy * sumy / N)) / N;
valOut = static_cast<double>(N * var_popy);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -1110,9 +1177,9 @@ extern "C"
struct regr_sxy_data
{
int64_t cnt;
double sumx;
double sumy;
double sumxy; // sum of (x*y)
long double sumx;
long double sumy;
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -1123,8 +1190,8 @@ extern "C"
struct regr_sxy_data* data;
if (args->arg_count != 2)
{
strcpy(message,"regr_sxy() requires two arguments");
return 1;
strcpy(message,"regr_sxy() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -1132,10 +1199,12 @@ extern "C"
return 1;
}
if (!(data = (struct regr_sxy_data*) malloc(sizeof(struct regr_sxy_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct regr_sxy_data*) malloc(sizeof(struct regr_sxy_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -1198,16 +1267,21 @@ extern "C"
{
struct regr_sxy_data* data = (struct regr_sxy_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumxy = data->sumxy;
double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
return data->cnt * covar_pop;
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumxy = data->sumxy;
long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
long double regr_sxy = N * covar_pop;
valOut = static_cast<double>(regr_sxy);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -1218,9 +1292,9 @@ extern "C"
struct covar_pop_data
{
int64_t cnt;
double sumx;
double sumy;
double sumxy; // sum of (x*y)
long double sumx;
long double sumy;
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -1231,8 +1305,8 @@ extern "C"
struct covar_pop_data* data;
if (args->arg_count != 2)
{
strcpy(message,"covar_pop() requires two arguments");
return 1;
strcpy(message,"covar_pop() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -1240,10 +1314,12 @@ extern "C"
return 1;
}
if (!(data = (struct covar_pop_data*) malloc(sizeof(struct covar_pop_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct covar_pop_data*) malloc(sizeof(struct covar_pop_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -1306,16 +1382,20 @@ extern "C"
{
struct covar_pop_data* data = (struct covar_pop_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumxy = data->sumxy;
double covar_pop = (sumxy - ((sumx * sumy) / N)) / N;
return covar_pop;
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumxy = data->sumxy;
long double covar_pop = (sumxy - ((sumx * sumy) / N)) / N ;
valOut = static_cast<double>(covar_pop);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
//=======================================================================
@@ -1325,9 +1405,9 @@ extern "C"
struct covar_samp_data
{
int64_t cnt;
double sumx;
double sumy;
double sumxy; // sum of (x*y)
long double sumx;
long double sumy;
long double sumxy; // sum of (x*y)
};
#ifdef _MSC_VER
@@ -1338,8 +1418,8 @@ extern "C"
struct covar_samp_data* data;
if (args->arg_count != 2)
{
strcpy(message,"covar_samp() requires two arguments");
return 1;
strcpy(message,"covar_samp() requires two arguments");
return 1;
}
if (!(isNumeric(args->arg_type[0], args->attributes[0]) && isNumeric(args->arg_type[1], args->attributes[1])))
{
@@ -1347,10 +1427,12 @@ extern "C"
return 1;
}
if (!(data = (struct covar_samp_data*) malloc(sizeof(struct covar_samp_data))))
initid->decimals = DECIMAL_NOT_SPECIFIED;
if (!(data = (struct covar_samp_data*) malloc(sizeof(struct covar_samp_data))))
{
strmov(message,"Couldn't allocate memory");
return 1;
strmov(message,"Couldn't allocate memory");
return 1;
}
data->cnt = 0;
data->sumx = 0.0;
@@ -1413,16 +1495,20 @@ extern "C"
{
struct covar_samp_data* data = (struct covar_samp_data*)initid->ptr;
double N = data->cnt;
double valOut = 0;
if (N > 0)
{
double sumx = data->sumx;
double sumy = data->sumy;
double sumxy = data->sumxy;
double covar_samp = (sumxy - ((sumx * sumy) / N)) / (N-1);
return covar_samp;
long double sumx = data->sumx;
long double sumy = data->sumy;
long double sumxy = data->sumxy;
long double covar_samp = (sumxy - ((sumx * sumy) / N)) / (N - 1);
valOut = static_cast<double>(covar_samp);
}
*is_null = 1;
return 0;
else
{
*is_null = 1;
}
return valOut;
}
}
// vim:ts=4 sw=4:

View File

@@ -218,7 +218,6 @@ inline string getStringNullValue()
namespace rowgroup
{
const std::string typeStr("");
const static_any::any& RowAggregation::charTypeId((char)1);
const static_any::any& RowAggregation::scharTypeId((signed char)1);
@@ -590,7 +589,8 @@ inline bool RowAggregation::isNull(const RowGroup* pRowGroup, const Row& row, in
RowAggregation::RowAggregation() :
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fOrigFunctionCols(NULL)
{
}
@@ -599,7 +599,8 @@ RowAggregation::RowAggregation(const vector<SP_ROWAGG_GRPBY_t>& rowAggGroupByCol
const vector<SP_ROWAGG_FUNC_t>& rowAggFunctionCols) :
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0)
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fOrigFunctionCols(NULL)
{
fGroupByCols.assign(rowAggGroupByCols.begin(), rowAggGroupByCols.end());
fFunctionCols.assign(rowAggFunctionCols.begin(), rowAggFunctionCols.end());
@@ -610,7 +611,7 @@ RowAggregation::RowAggregation(const RowAggregation& rhs):
fAggMapPtr(NULL), fRowGroupOut(NULL),
fTotalRowCount(0), fMaxTotalRowCount(AGG_ROWGROUP_SIZE),
fSmallSideRGs(NULL), fLargeSideRG(NULL), fSmallSideCount(0),
fRGContext(rhs.fRGContext)
fRGContext(rhs.fRGContext), fOrigFunctionCols(NULL)
{
//fGroupByCols.clear();
//fFunctionCols.clear();
@@ -714,11 +715,8 @@ void RowAggregation::setJoinRowGroups(vector<RowGroup>* pSmallSideRG, RowGroup*
// threads on the PM and by multple threads on the UM. It must remain
// thread safe.
//------------------------------------------------------------------------------
void RowAggregation::resetUDAF(uint64_t funcColID)
void RowAggregation::resetUDAF(RowUDAFFunctionCol* rowUDAF)
{
// Get the UDAF class pointer and store in the row definition object.
RowUDAFFunctionCol* rowUDAF = dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[funcColID].get());
// RowAggregation and it's functions need to be re-entrant which means
// each instance (thread) needs its own copy of the context object.
// Note: operator=() doesn't copy userData.
@@ -786,7 +784,7 @@ void RowAggregation::initialize()
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
@@ -838,7 +836,7 @@ void RowAggregation::aggReset()
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(i);
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
@@ -885,14 +883,28 @@ void RowAggregationUM::aggregateRowWithRemap(Row& row)
inserted.first->second = RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
if (fOrigFunctionCols)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
// This is a multi-distinct query and fFunctionCols may not
// contain all the UDAF we need to reset
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
{
resetUDAF(i);
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
}
}
}
else
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
// replace the key value with an equivalent copy, yes this is OK
const_cast<RowPosition&>((inserted.first->first)) = pos;
}
@@ -946,14 +958,28 @@ void RowAggregation::aggregateRow(Row& row)
RowPosition(fResultDataVec.size() - 1, fRowGroupOut->getRowCount() - 1);
// If there's UDAF involved, reset the user data.
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
if (fOrigFunctionCols)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
// This is a multi-distinct query and fFunctionCols may not
// contain all the UDAF we need to reset
for (uint64_t i = 0; i < fOrigFunctionCols->size(); i++)
{
resetUDAF(i);
if ((*fOrigFunctionCols)[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>((*fOrigFunctionCols)[i].get()));
}
}
}
else
{
for (uint64_t i = 0; i < fFunctionCols.size(); i++)
{
if (fFunctionCols[i]->fAggFunction == ROWAGG_UDAF)
{
resetUDAF(dynamic_cast<RowUDAFFunctionCol*>(fFunctionCols[i].get()));
}
}
}
}
else
{
@@ -4699,7 +4725,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
{
// backup the function column vector for finalize().
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
fOrigFunctionCols = &origFunctionCols;
// aggregate data from each sub-aggregator to distinct aggregator
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
{
@@ -4727,6 +4753,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation()
// restore the function column vector
fFunctionCols = origFunctionCols;
fOrigFunctionCols = NULL;
}
@@ -4734,6 +4761,7 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
{
// backup the function column vector for finalize().
vector<SP_ROWAGG_FUNC_t> origFunctionCols = fFunctionCols;
fOrigFunctionCols = &origFunctionCols;
// aggregate data from each sub-aggregator to distinct aggregator
for (uint64_t i = 0; i < fSubAggregators.size(); ++i)
@@ -4751,9 +4779,9 @@ void RowAggregationMultiDistinct::doDistinctAggregation_rowVec(vector<vector<Row
inRows[i].clear();
}
// restore the function column vector
fFunctionCols = origFunctionCols;
fOrigFunctionCols = NULL;
}

View File

@@ -630,7 +630,7 @@ protected:
if (fAggMapPtr) fAggMapPtr->clear();
}
void resetUDAF(uint64_t funcColID);
void resetUDAF(RowUDAFFunctionCol* rowUDAF);
inline bool isNull(const RowGroup* pRowGroup, const Row& row, int64_t col);
inline void makeAggFieldsNull(Row& row);
@@ -710,6 +710,9 @@ protected:
static const static_any::any& doubleTypeId;
static const static_any::any& longdoubleTypeId;
static const static_any::any& strTypeId;
// For UDAF along with with multiple distinct columns
vector<SP_ROWAGG_FUNC_t>* fOrigFunctionCols;
};
//------------------------------------------------------------------------------

View File

@@ -1176,7 +1176,12 @@ inline bool Row::equals(const Row& r2, const std::vector<uint32_t>& keyCols) con
if (!isLongString(col))
{
if (getUintField(col) != r2.getUintField(col))
if (getColType(i) == execplan::CalpontSystemCatalog::LONGDOUBLE)
{
if (getLongDoubleField(i) != r2.getLongDoubleField(i))
return false;
}
else if (getUintField(col) != r2.getUintField(col))
return false;
}
else
@@ -1204,7 +1209,12 @@ inline bool Row::equals(const Row& r2, uint32_t lastCol) const
for (uint32_t i = 0; i <= lastCol; i++)
if (!isLongString(i))
{
if (getUintField(i) != r2.getUintField(i))
if (getColType(i) == execplan::CalpontSystemCatalog::LONGDOUBLE)
{
if (getLongDoubleField(i) != r2.getLongDoubleField(i))
return false;
}
else if (getUintField(i) != r2.getUintField(i))
return false;
}
else

View File

@@ -45,7 +45,7 @@ namespace
{
// Minimum time to wait for a condition, so as to periodically wake up and
// check the global job status, to see if the job needs to terminate.
const int COND_WAIT_SECONDS = 3;
const int COND_WAIT_SECONDS = 1;
}
namespace WriteEngine

View File

@@ -29,6 +29,7 @@
#include <cstdlib>
#include <cstring>
#include <vector>
#include <set>
#include <sstream>
#include <inttypes.h>
#include <iostream>
@@ -108,7 +109,6 @@ Dctnry::Dctnry() :
&m_endHeader, HDR_UNIT_SIZE);
m_curFbo = INVALID_NUM;
m_curLbid = INVALID_LBID;
memset(m_sigArray, 0, MAX_STRING_CACHE_SIZE * sizeof(Signature));
m_arraySize = 0;
clear();//files
@@ -130,14 +130,16 @@ Dctnry::~Dctnry()
******************************************************************************/
void Dctnry::freeStringCache( )
{
for (int i = 0; i < m_arraySize; i++)
std::set<Signature,sig_compare>::iterator it;
for (it=m_sigArray.begin(); it!=m_sigArray.end(); it++)
{
delete [] m_sigArray[i].signature;
m_sigArray[i].signature = 0;
Signature sig = *it;
delete [] sig.signature;
sig.signature = 0;
}
memset(m_sigArray, 0, MAX_STRING_CACHE_SIZE * sizeof(Signature));
m_arraySize = 0;
m_sigArray.clear();
}
/*******************************************************************************
@@ -161,7 +163,6 @@ int Dctnry::init()
m_curOp = 0;
memset( m_curBlock.data, 0, sizeof(m_curBlock.data));
m_curBlock.lbid = INVALID_LBID;
memset(m_sigArray, 0, MAX_STRING_CACHE_SIZE * sizeof(Signature));
m_arraySize = 0;
return NO_ERROR;
@@ -627,19 +628,17 @@ int Dctnry::openDctnry(const OID& dctnryOID,
******************************************************************************/
bool Dctnry::getTokenFromArray(Signature& sig)
{
for (int i = 0; i < (int)m_arraySize ; i++ )
{
if (sig.size == m_sigArray[i].size)
{
if (!memcmp(sig.signature, m_sigArray[i].signature, sig.size))
{
sig.token = m_sigArray[i].token;
return true;
}//endif sig compare
}//endif size compare
}
std::set<Signature,sig_compare>::iterator it;
it = m_sigArray.find(sig);
if ( it == m_sigArray.end()){
return false;
}else{
Signature sigfound = *it;
sig.token = sigfound.token;
return true;
}
return false;
return false;
}
/*******************************************************************************
@@ -1333,7 +1332,7 @@ void Dctnry::preLoadStringCache( const DataBlock& fileBlock )
memcpy(aSig.signature, &fileBlock.data[offBeg], len);
aSig.token.op = op;
aSig.token.fbo = m_curLbid;
m_sigArray[op - 1] = aSig;
m_sigArray.insert(aSig);
offEnd = offBeg;
hdrOffsetBeg += HDR_UNIT_SIZE;
@@ -1372,7 +1371,7 @@ void Dctnry::addToStringCache( const Signature& newSig )
memcpy(asig.signature, newSig.signature, newSig.size );
asig.size = newSig.size;
asig.token = newSig.token;
m_sigArray[m_arraySize] = asig;
m_sigArray.insert(asig);
m_arraySize++;
}
@@ -1465,7 +1464,7 @@ int Dctnry::updateDctnry(unsigned char* sigValue, int& sigSize,
sig.signature = new unsigned char[sigSize];
memcpy (sig.signature, sigValue, sigSize);
sig.token = token;
m_sigArray[m_arraySize] = sig;
m_sigArray.insert(sig);
m_arraySize++;
}

View File

@@ -56,6 +56,17 @@ typedef struct Signature
Token token;
} Signature;
struct sig_compare {
bool operator() (const Signature& a, const Signature& b) const {
if (a.size == b.size){
return memcmp(a.signature,b.signature,a.size)<0;}
else if (a.size<b.size){
return true;
}else{ return false;}
}
};
/**
* @brief Class to interface with dictionary store files.
*/
@@ -300,7 +311,7 @@ protected:
virtual void closeDctnryFile(bool doFlush, std::map<FID, FID>& oids);
virtual int numOfBlocksInFile();
Signature m_sigArray[MAX_STRING_CACHE_SIZE]; // string cache
std::set<Signature,sig_compare> m_sigArray;
int m_arraySize; // num strings in m_sigArray
// m_dctnryHeader used for hdr when readSubBlockEntry is used to read a blk

View File

@@ -133,7 +133,7 @@ const int DctnryStore::updateDctnryStore(unsigned char* sigValue,
sig.signature = new unsigned char[sigSize];
memcpy (sig.signature, sigValue, sigSize);
sig.token = token;
m_dctnry.m_sigArray[m_dctnry.m_arraySize] = sig;
m_dctnry.m_sigArray.insert(sig) = sig;
m_dctnry.m_arraySize++;
}