You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-08 14:22:09 +03:00
Merge branch 'develop' into MCOL-520
This commit is contained in:
@@ -123,6 +123,9 @@ static int calpont_rollback(handlerton* hton, THD* thd, bool all);
|
||||
static int calpont_close_connection ( handlerton* hton, THD* thd );
|
||||
handlerton* calpont_hton;
|
||||
|
||||
static group_by_handler*
|
||||
create_calpont_group_by_handler(THD* thd, Query* query);
|
||||
|
||||
/* Variables for example share methods */
|
||||
|
||||
/*
|
||||
@@ -218,6 +221,7 @@ static int columnstore_init_func(void* p)
|
||||
calpont_hton->commit = calpont_commit;
|
||||
calpont_hton->rollback = calpont_rollback;
|
||||
calpont_hton->close_connection = calpont_close_connection;
|
||||
calpont_hton->create_group_by = create_calpont_group_by_handler;
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
@@ -1135,6 +1139,86 @@ static MYSQL_SYSVAR_ULONG(
|
||||
0);
|
||||
#endif
|
||||
|
||||
/*@brief create_calpont_group_by_handler- Creates handler*/
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Creates a group_by pushdown handler.
|
||||
* Details are in server/sql/group_by_handler.h
|
||||
* PARAMETERS:
|
||||
* thd - THD pointer.
|
||||
* query - Query structure, that describes the pushdowned query.
|
||||
* RETURN:
|
||||
* group_by_handler if success
|
||||
* NULL in other case
|
||||
***********************************************************/
|
||||
static group_by_handler*
|
||||
create_calpont_group_by_handler(THD* thd, Query* query)
|
||||
{
|
||||
ha_calpont_group_by_handler* handler = NULL;
|
||||
|
||||
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE )
|
||||
{
|
||||
handler = new ha_calpont_group_by_handler(thd, query);
|
||||
|
||||
// Notify the server, that CS handles GROUP BY, ORDER BY and HAVING clauses.
|
||||
query->group_by = NULL;
|
||||
query->order_by = NULL;
|
||||
query->having = NULL;
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Makes the plan and prepares the data
|
||||
* RETURN:
|
||||
* int rc
|
||||
***********************************************************/
|
||||
int ha_calpont_group_by_handler::init_scan()
|
||||
{
|
||||
DBUG_ENTER("ha_calpont_group_by_handler::init_scan");
|
||||
|
||||
// Save vtable_state to restore the after we inited.
|
||||
THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state;
|
||||
// MCOL-1052 Should be removed after cleaning the code up.
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE;
|
||||
int rc = ha_calpont_impl_group_by_init(this, table);
|
||||
thd->infinidb_vtable.vtable_state = oldState;
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Fetches a row and saves it to a temporary table.
|
||||
* RETURN:
|
||||
* int rc
|
||||
***********************************************************/
|
||||
int ha_calpont_group_by_handler::next_row()
|
||||
{
|
||||
DBUG_ENTER("ha_calpont_group_by_handler::next_row");
|
||||
int rc = ha_calpont_impl_group_by_next(this, table);
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Shuts the scan down.
|
||||
* RETURN:
|
||||
* int rc
|
||||
***********************************************************/
|
||||
int ha_calpont_group_by_handler::end_scan()
|
||||
{
|
||||
DBUG_ENTER("ha_calpont_group_by_handler::end_scan");
|
||||
|
||||
int rc = ha_calpont_impl_group_by_end(this, table);
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
}
|
||||
|
||||
|
||||
static struct st_mysql_sys_var* calpont_system_variables[] =
|
||||
{
|
||||
// MYSQL_SYSVAR(enum_var),
|
||||
|
@@ -40,6 +40,8 @@
|
||||
#include <my_config.h>
|
||||
#include "idb_mysql.h"
|
||||
|
||||
extern handlerton* calpont_hton;
|
||||
|
||||
/** @brief
|
||||
EXAMPLE_SHARE is a structure that will be shared among all open handlers.
|
||||
This example implements the minimum of what you will probably need.
|
||||
@@ -245,5 +247,56 @@ public:
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/*@brief group_by_handler class*/
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Provides server with group_by_handler API methods.
|
||||
* One should read comments in server/sql/group_by_handler.h
|
||||
* Attributes:
|
||||
* select - attribute contains all GROUP BY, HAVING, ORDER items and calls it
|
||||
* an extended SELECT list accordin to comments in
|
||||
* server/sql/group_handler.cc.
|
||||
* So the temporary table for
|
||||
* select count(*) from b group by a having a > 3 order by a
|
||||
* will have 4 columns not 1.
|
||||
* However server ignores all NULLs used in GROUP BY, HAVING, ORDER.
|
||||
* table_list - contains all tables involved. Must be CS tables only.
|
||||
* distinct - looks like a useless thing for now. Couldn't get it set by server.
|
||||
* where - where items.
|
||||
* group_by - group by ORDER items.
|
||||
* order_by - order by ORDER items.
|
||||
* having - having Item.
|
||||
* Methods:
|
||||
* init_scan - get plan and send it to ExeMgr. Get the execution result.
|
||||
* next_row - get a row back from sm.
|
||||
* end_scan - finish and clean the things up.
|
||||
***********************************************************/
|
||||
class ha_calpont_group_by_handler: public group_by_handler
|
||||
{
|
||||
public:
|
||||
ha_calpont_group_by_handler(THD* thd_arg, Query* query)
|
||||
: group_by_handler(thd_arg, calpont_hton),
|
||||
select(query->select),
|
||||
table_list(query->from),
|
||||
distinct(query->distinct),
|
||||
where(query->where),
|
||||
group_by(query->group_by),
|
||||
order_by(query->order_by),
|
||||
having(query->having)
|
||||
{ }
|
||||
~ha_calpont_group_by_handler() { }
|
||||
int init_scan();
|
||||
int next_row();
|
||||
int end_scan();
|
||||
|
||||
List<Item>* select;
|
||||
TABLE_LIST* table_list;
|
||||
bool distinct;
|
||||
Item* where;
|
||||
ORDER* group_by;
|
||||
ORDER* order_by;
|
||||
Item* having;
|
||||
};
|
||||
#endif //HA_CALPONT_H__
|
||||
|
||||
|
@@ -174,6 +174,10 @@ uint32_t convertDataType(int dataType)
|
||||
calpontDataType = CalpontSystemCatalog::DATETIME;
|
||||
break;
|
||||
|
||||
case ddlpackage::DDL_TIME:
|
||||
calpontDataType = CalpontSystemCatalog::TIME;
|
||||
break;
|
||||
|
||||
case ddlpackage::DDL_CLOB:
|
||||
calpontDataType = CalpontSystemCatalog::CLOB;
|
||||
break;
|
||||
@@ -1868,20 +1872,16 @@ int ProcessDDLStatement(string& ddlStatement, string& schema, const string& tabl
|
||||
if (ddlStatement.find("AUTO_INCREMENT") != string::npos)
|
||||
{
|
||||
thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "Use of the MySQL auto_increment syntax is not supported in Columnstore. If you wish to create an auto increment column in Columnstore, please consult the Columnstore SQL Syntax Guide for the correct usage.");
|
||||
}
|
||||
// MCOL-867. MariaDB RENAME TABLE statement supports WAIT|NOWAIT options since 10.3.0 but Columnstore isn't yet.
|
||||
else if (ddlStatement.find("WAIT") != string::npos || ddlStatement.find("NOWAIT") != string::npos)
|
||||
{
|
||||
thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "WAIT and NOWAIT options are not supported in Columnstore. Please consult the Columnstore SQL Syntax Guide for the correct usage.");
|
||||
ci->alterTableState = cal_connection_info::NOT_ALTER;
|
||||
ci->isAlter = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
//@Bug 1888,1885. update error message
|
||||
thd->raise_error_printf(ER_CHECK_NOT_IMPLEMENTED, "The syntax or the data type(s) is not supported by Columnstore. Please check the Columnstore syntax guide for supported syntax or data types.");
|
||||
ci->alterTableState = cal_connection_info::NOT_ALTER;
|
||||
ci->isAlter = false;
|
||||
}
|
||||
|
||||
ci->alterTableState = cal_connection_info::NOT_ALTER;
|
||||
ci->isAlter = false;
|
||||
}
|
||||
|
||||
return rc;
|
||||
@@ -2183,7 +2183,6 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti
|
||||
THD* thd = current_thd;
|
||||
string emsg;
|
||||
|
||||
ostringstream stmt1;
|
||||
pair<string, string> fromPair;
|
||||
pair<string, string> toPair;
|
||||
string stmt;
|
||||
@@ -2211,20 +2210,21 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti
|
||||
return -1;
|
||||
}
|
||||
|
||||
stmt1 << "alter table " << fromPair.second << " rename to " << toPair.second << ";";
|
||||
|
||||
stmt = stmt1.str();
|
||||
// This explicitely shields both db objects with quotes that the lexer strips down later.
|
||||
stmt = "alter table `" + fromPair.second + "` rename to `" + toPair.second + "`;";
|
||||
string db;
|
||||
|
||||
if ( fromPair.first.length() != 0 )
|
||||
db = fromPair.first;
|
||||
else if ( thd->db )
|
||||
if ( thd->db )
|
||||
db = thd->db;
|
||||
else if ( fromPair.first.length() != 0 )
|
||||
db = fromPair.first;
|
||||
else
|
||||
db = toPair.first;
|
||||
|
||||
int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg);
|
||||
|
||||
if (rc != 0)
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str());
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_ERROR, 9999, emsg.c_str());
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
@@ -136,7 +136,8 @@ int buildBuffer(uchar* buf, string& buffer, int& columns, TABLE* table)
|
||||
(*field)->type() == MYSQL_TYPE_STRING ||
|
||||
(*field)->type() == MYSQL_TYPE_DATE ||
|
||||
(*field)->type() == MYSQL_TYPE_DATETIME ||
|
||||
(*field)->type() == MYSQL_TYPE_DATETIME2 )
|
||||
(*field)->type() == MYSQL_TYPE_DATETIME2 ||
|
||||
(*field)->type() == MYSQL_TYPE_TIME )
|
||||
vals.append("'");
|
||||
|
||||
while (ptr < end_ptr)
|
||||
@@ -166,7 +167,8 @@ int buildBuffer(uchar* buf, string& buffer, int& columns, TABLE* table)
|
||||
(*field)->type() == MYSQL_TYPE_STRING ||
|
||||
(*field)->type() == MYSQL_TYPE_DATE ||
|
||||
(*field)->type() == MYSQL_TYPE_DATETIME ||
|
||||
(*field)->type() == MYSQL_TYPE_DATETIME2 )
|
||||
(*field)->type() == MYSQL_TYPE_DATETIME2 ||
|
||||
(*field)->type() == MYSQL_TYPE_TIME )
|
||||
vals.append("'");
|
||||
}
|
||||
}
|
||||
@@ -838,11 +840,23 @@ int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_
|
||||
// mariadb 10.1 compatibility -- MYSQL_TYPE_DATETIME2 introduced in mysql 5.6
|
||||
MYSQL_TIME ltime;
|
||||
const uchar* pos = buf;
|
||||
longlong tmp = my_datetime_packed_from_binary(pos, 0);
|
||||
longlong tmp = my_datetime_packed_from_binary(pos, table->field[colpos]->decimals());
|
||||
TIME_from_longlong_datetime_packed(<ime, tmp);
|
||||
fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d%c",
|
||||
ltime.year, ltime.month, ltime.day,
|
||||
ltime.hour, ltime.minute, ltime.second, ci.delimiter);
|
||||
|
||||
if (!ltime.second_part)
|
||||
{
|
||||
fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d%c",
|
||||
ltime.year, ltime.month, ltime.day,
|
||||
ltime.hour, ltime.minute, ltime.second, ci.delimiter);
|
||||
}
|
||||
else
|
||||
{
|
||||
fprintf(ci.filePtr, "%04d-%02d-%02d %02d:%02d:%02d.%ld%c",
|
||||
ltime.year, ltime.month, ltime.day,
|
||||
ltime.hour, ltime.minute, ltime.second,
|
||||
ltime.second_part, ci.delimiter);
|
||||
}
|
||||
|
||||
buf += table->field[colpos]->pack_length();
|
||||
}
|
||||
else
|
||||
@@ -866,6 +880,39 @@ int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::TIME:
|
||||
{
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
{
|
||||
fprintf(ci.filePtr, "%c", ci.delimiter);
|
||||
|
||||
buf += table->field[colpos]->pack_length();
|
||||
}
|
||||
else
|
||||
{
|
||||
MYSQL_TIME ltime;
|
||||
const uchar* pos = buf;
|
||||
longlong tmp = my_time_packed_from_binary(pos, table->field[colpos]->decimals());
|
||||
TIME_from_longlong_time_packed(<ime, tmp);
|
||||
|
||||
if (!ltime.second_part)
|
||||
{
|
||||
fprintf(ci.filePtr, "%02d:%02d:%02d%c",
|
||||
ltime.hour, ltime.minute, ltime.second, ci.delimiter);
|
||||
}
|
||||
else
|
||||
{
|
||||
fprintf(ci.filePtr, "%02d:%02d:%02d.%ld%c",
|
||||
ltime.hour, ltime.minute, ltime.second,
|
||||
ltime.second_part, ci.delimiter);
|
||||
}
|
||||
|
||||
buf += table->field[colpos]->pack_length();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
{
|
||||
if (nullVal && (ci.columnTypes[colpos].constraintType != CalpontSystemCatalog::NOTNULL_CONSTRAINT))
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -171,6 +171,7 @@ const unsigned NONSUPPORTED_ERR_THRESH = 2000;
|
||||
vector<RMParam> rmParms;
|
||||
ResourceManager* rm = ResourceManager::instance();
|
||||
bool useHdfs = rm->useHdfs();
|
||||
|
||||
//convenience fcn
|
||||
inline uint32_t tid2sid(const uint32_t tid)
|
||||
{
|
||||
@@ -275,6 +276,104 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType&
|
||||
}
|
||||
}
|
||||
|
||||
void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
|
||||
{
|
||||
// unset null bit first
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
// For unsigned, use the ColType returned in the row rather than the
|
||||
// unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
|
||||
// Hopefully, in all other cases we get it right.
|
||||
switch ((*f)->type())
|
||||
{
|
||||
case MYSQL_TYPE_NEWDECIMAL:
|
||||
{
|
||||
Field_new_decimal* f2 = (Field_new_decimal*)*f;
|
||||
|
||||
// @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
|
||||
// to create vtable limitation.
|
||||
if (f2->dec < ct.scale)
|
||||
f2->dec = ct.scale;
|
||||
|
||||
char buf[256];
|
||||
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
|
||||
f2->store(buf, strlen(buf), f2->charset());
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_TINY: //TINYINT type
|
||||
{
|
||||
Field_tiny* f2 = (Field_tiny*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_SHORT: //SMALLINT type
|
||||
{
|
||||
Field_short* f2 = (Field_short*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_LONG: //INT type
|
||||
{
|
||||
Field_long* f2 = (Field_long*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_LONGLONG: //BIGINT type
|
||||
{
|
||||
Field_longlong* f2 = (Field_longlong*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_FLOAT: // FLOAT type
|
||||
{
|
||||
Field_float* f2 = (Field_float*)*f;
|
||||
float float_val = *(float*)(&value);
|
||||
f2->store(float_val);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_DOUBLE: // DOUBLE type
|
||||
{
|
||||
Field_double* f2 = (Field_double*)*f;
|
||||
double double_val = *(double*)(&value);
|
||||
f2->store(double_val);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_VARCHAR:
|
||||
{
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
char tmp[25];
|
||||
|
||||
if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
|
||||
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
|
||||
else
|
||||
snprintf(tmp, 25, "%ld", value);
|
||||
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
Field_longlong* f2 = (Field_longlong*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// @bug 2244. Log exception related to lost connection to ExeMgr.
|
||||
// Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
|
||||
@@ -328,7 +427,7 @@ int vbin2hex(const uint8_t* p, const unsigned l, char* o)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
|
||||
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag = false)
|
||||
{
|
||||
int rc = HA_ERR_END_OF_FILE;
|
||||
int num_attr = ti.msTablePtr->s->fields;
|
||||
@@ -370,8 +469,15 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
|
||||
{
|
||||
Field** f;
|
||||
f = ti.msTablePtr->field;
|
||||
|
||||
//set all fields to null in null col bitmap
|
||||
memset(buf, -1, ti.msTablePtr->s->null_bytes);
|
||||
if (!handler_flag)
|
||||
memset(buf, -1, ti.msTablePtr->s->null_bytes);
|
||||
else
|
||||
{
|
||||
memset(ti.msTablePtr->null_flags, -1, ti.msTablePtr->s->null_bytes);
|
||||
}
|
||||
|
||||
std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
|
||||
int64_t intColVal = 0;
|
||||
uint64_t uintColVal = 0;
|
||||
@@ -486,8 +592,21 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
|
||||
* based on the result set.
|
||||
*/
|
||||
/* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */
|
||||
if (((*f)->field_length > 19) && ((*f)->field_length != 57))
|
||||
(*f)->field_length = strlen(tmp);
|
||||
// if (((*f)->field_length > 19) && ((*f)->field_length != 57))
|
||||
// (*f)->field_length = strlen(tmp);
|
||||
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::TIME:
|
||||
{
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
intColVal = row.getUintField<8>(s);
|
||||
DataConvert::timeToString(intColVal, tmp, 255);
|
||||
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
@@ -4957,5 +5076,803 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos)
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
// vim:sw=4 ts=4:
|
||||
/*@brief ha_calpont_impl_group_by_init - Get data for MariaDB group_by
|
||||
pushdown handler */
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Prepares data for group_by_handler::next_row() calls.
|
||||
* PARAMETERS:
|
||||
* group_hand - group by handler, that preserves initial table and items lists. .
|
||||
* table - TABLE pointer The table to save the result set into.
|
||||
* RETURN:
|
||||
* 0 if success
|
||||
* others if something went wrong whilst getting the result set
|
||||
***********************************************************/
|
||||
int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table)
|
||||
{
|
||||
string tableName = group_hand->table_list->table->s->table_name.str;
|
||||
IDEBUG( cout << "group_by_init for table " << tableName << endl );
|
||||
THD* thd = current_thd;
|
||||
|
||||
//check whether the system is ready to process statement.
|
||||
#ifndef _MSC_VER
|
||||
static DBRM dbrm(true);
|
||||
bool bSystemQueryReady = dbrm.getSystemQueryReady();
|
||||
|
||||
if (bSystemQueryReady == 0)
|
||||
{
|
||||
// Still not ready
|
||||
setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
else if (bSystemQueryReady < 0)
|
||||
{
|
||||
// Still not ready
|
||||
setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
#endif
|
||||
// prevent "create table as select" from running on slave
|
||||
thd->infinidb_vtable.hasInfiniDBTable = true;
|
||||
|
||||
// return error if error status has been already set
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
|
||||
return ER_INTERNAL_ERROR;
|
||||
|
||||
// MCOL-1052
|
||||
// by pass the extra union trips. return 0
|
||||
//if (thd->infinidb_vtable.isUnion && thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE)
|
||||
// return 0;
|
||||
|
||||
// @bug 2232. Basic SP support. Error out non support sp cases.
|
||||
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
|
||||
if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
|
||||
{
|
||||
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
uint32_t sessionID = tid2sid(thd->thread_id);
|
||||
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
csc->identity(CalpontSystemCatalog::FE);
|
||||
|
||||
if (!thd->infinidb_vtable.cal_conn_info)
|
||||
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
|
||||
|
||||
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
|
||||
|
||||
idbassert(ci != 0);
|
||||
|
||||
|
||||
// MySQL sometimes calls rnd_init multiple times, plan should only be
|
||||
// generated and sent once.
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE &&
|
||||
!thd->infinidb_vtable.isNewQuery)
|
||||
return 0;
|
||||
|
||||
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
|
||||
{
|
||||
if (ci->cal_conn_hndl)
|
||||
{
|
||||
// send ExeMgr a signal before closing the connection
|
||||
ByteStream msg;
|
||||
ByteStream::quadbyte qb = 0;
|
||||
msg << qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->exeMgr->write(msg);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// canceling query. ignore connection failure.
|
||||
}
|
||||
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
sm::tableid_t tableid = 0;
|
||||
cal_table_info ti;
|
||||
cal_group_info gi;
|
||||
sm::cpsm_conhdl_t* hndl;
|
||||
SCSEP csep;
|
||||
|
||||
bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false);
|
||||
|
||||
{
|
||||
ci->stats.reset(); // reset query stats
|
||||
ci->stats.setStartTime();
|
||||
ci->stats.fUser = thd->main_security_ctx.user;
|
||||
|
||||
if (thd->main_security_ctx.host)
|
||||
ci->stats.fHost = thd->main_security_ctx.host;
|
||||
else if (thd->main_security_ctx.host_or_ip)
|
||||
ci->stats.fHost = thd->main_security_ctx.host_or_ip;
|
||||
else
|
||||
ci->stats.fHost = "unknown";
|
||||
|
||||
try
|
||||
{
|
||||
ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
string msg = string("Columnstore User Priority - ") + e.what();
|
||||
ci->warningMsg = msg;
|
||||
}
|
||||
|
||||
// if the previous query has error, re-establish the connection
|
||||
if (ci->queryState != 0)
|
||||
{
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
}
|
||||
|
||||
sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
|
||||
idbassert(ci->cal_conn_hndl != 0);
|
||||
ci->cal_conn_hndl->csc = csc;
|
||||
idbassert(ci->cal_conn_hndl->exeMgr != 0);
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->connect();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
|
||||
goto error;
|
||||
}
|
||||
|
||||
hndl = ci->cal_conn_hndl;
|
||||
|
||||
if (!csep)
|
||||
csep.reset(new CalpontSelectExecutionPlan());
|
||||
|
||||
SessionManager sm;
|
||||
BRM::TxnID txnID;
|
||||
txnID = sm.getTxnID(sessionID);
|
||||
|
||||
if (!txnID.valid)
|
||||
{
|
||||
txnID.id = 0;
|
||||
txnID.valid = true;
|
||||
}
|
||||
|
||||
QueryContext verID;
|
||||
verID = sm.verID();
|
||||
|
||||
csep->txnID(txnID.id);
|
||||
csep->verID(verID);
|
||||
csep->sessionID(sessionID);
|
||||
|
||||
if (group_hand->table_list->db_length)
|
||||
csep->schemaName(group_hand->table_list->db);
|
||||
|
||||
csep->traceFlags(ci->traceFlags);
|
||||
|
||||
// MCOL-1052 Send Items lists down to the optimizer.
|
||||
gi.groupByTables = group_hand->table_list;
|
||||
gi.groupByFields = group_hand->select;
|
||||
gi.groupByWhere = group_hand->where;
|
||||
gi.groupByGroup = group_hand->group_by;
|
||||
gi.groupByOrder = group_hand->order_by;
|
||||
gi.groupByHaving = group_hand->having;
|
||||
gi.groupByDistinct = group_hand->distinct;
|
||||
|
||||
// MCOL-1052 Send pushed conditions here, since server could omit GROUP BY
|
||||
// items in case of = or IN functions used on GROUP BY columns.
|
||||
{
|
||||
CalTableMap::iterator mapiter;
|
||||
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
|
||||
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
|
||||
execplan::ParseTree* ptIt;
|
||||
execplan::ReturnedColumn* rcIt;
|
||||
|
||||
for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
|
||||
{
|
||||
mapiter = ci->tableMap.find(tl->table);
|
||||
|
||||
if (mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL
|
||||
&& mapiter->second.condInfo->condPush)
|
||||
{
|
||||
while (!mapiter->second.condInfo->ptWorkStack.empty())
|
||||
{
|
||||
ptIt = mapiter->second.condInfo->ptWorkStack.top();
|
||||
mapiter->second.condInfo->ptWorkStack.pop();
|
||||
gi.pushedPts.push_back(ptIt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// send plan whenever group_init is called
|
||||
int status = cp_get_group_plan(thd, csep, gi);
|
||||
|
||||
if (status > 0)
|
||||
goto internal_error;
|
||||
else if (status < 0)
|
||||
return 0;
|
||||
|
||||
// @bug 2547. don't need to send the plan if it's impossible where for all unions.
|
||||
if (thd->infinidb_vtable.impossibleWhereOnUnion)
|
||||
return 0;
|
||||
|
||||
string query;
|
||||
query.assign(thd->infinidb_vtable.original_query.ptr(),
|
||||
thd->infinidb_vtable.original_query.length());
|
||||
csep->data(query);
|
||||
|
||||
try
|
||||
{
|
||||
csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
string msg = string("Columnstore User Priority - ") + e.what();
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
|
||||
}
|
||||
|
||||
#ifdef PLAN_HEX_FILE
|
||||
// plan serialization
|
||||
ifstream ifs("/tmp/li1-plan.hex");
|
||||
ByteStream bs1;
|
||||
ifs >> bs1;
|
||||
ifs.close();
|
||||
csep->unserialize(bs1);
|
||||
#endif
|
||||
|
||||
if (ci->traceFlags & 1)
|
||||
{
|
||||
cerr << "---------------- EXECUTION PLAN ----------------" << endl;
|
||||
cerr << *csep << endl ;
|
||||
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
|
||||
IDEBUG( cerr << *csep << endl );
|
||||
IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
|
||||
}
|
||||
}// end of execution plan generation
|
||||
|
||||
{
|
||||
ByteStream msg;
|
||||
ByteStream emsgBs;
|
||||
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
ByteStream::quadbyte qb = 4;
|
||||
msg << qb;
|
||||
hndl->exeMgr->write(msg);
|
||||
msg.restart();
|
||||
csep->rmParms(rmParms);
|
||||
|
||||
//send plan
|
||||
csep->serialize(msg);
|
||||
hndl->exeMgr->write(msg);
|
||||
|
||||
//get ExeMgr status back to indicate a vtable joblist success or not
|
||||
msg.restart();
|
||||
emsgBs.restart();
|
||||
msg = hndl->exeMgr->read();
|
||||
emsgBs = hndl->exeMgr->read();
|
||||
string emsg;
|
||||
|
||||
if (msg.length() == 0 || emsgBs.length() == 0)
|
||||
{
|
||||
emsg = "Lost connection to ExeMgr. Please contact your administrator";
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
string emsgStr;
|
||||
emsgBs >> emsgStr;
|
||||
bool err = false;
|
||||
|
||||
if (msg.length() == 4)
|
||||
{
|
||||
msg >> qb;
|
||||
|
||||
if (qb != 0)
|
||||
{
|
||||
err = true;
|
||||
// for makejoblist error, stats contains only error code and insert from here
|
||||
// because table fetch is not started
|
||||
ci->stats.setEndTime();
|
||||
ci->stats.fQuery = csep->data();
|
||||
ci->stats.fQueryType = csep->queryType();
|
||||
ci->stats.fErrorNo = qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->stats.insert();
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
string msg = string("Columnstore Query Stats - ") + e.what();
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
err = true;
|
||||
}
|
||||
|
||||
if (err)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, emsgStr);
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
rmParms.clear();
|
||||
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
|
||||
{
|
||||
ci->tableMap[table] = ti;
|
||||
}
|
||||
else
|
||||
{
|
||||
ci->queryState = 1;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
sm::sm_cleanup(hndl);
|
||||
hndl = 0;
|
||||
|
||||
sm::sm_init(sessionID, &hndl, localQuery);
|
||||
idbassert(hndl != 0);
|
||||
hndl->csc = csc;
|
||||
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
|
||||
ti.conn_hndl = hndl;
|
||||
else
|
||||
ci->cal_conn_hndl = hndl;
|
||||
|
||||
try
|
||||
{
|
||||
hndl->connect();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
|
||||
goto error;
|
||||
}
|
||||
|
||||
msg.restart();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// set query state to be in_process. Sometimes mysql calls rnd_init multiple
|
||||
// times, this makes sure plan only being generated and sent once. It will be
|
||||
// reset when query finishes in sm::end_query
|
||||
thd->infinidb_vtable.isNewQuery = false;
|
||||
|
||||
// common path for both vtable select phase and table mode -- open scan handle
|
||||
ti = ci->tableMap[table];
|
||||
ti.msTablePtr = table;
|
||||
|
||||
// MCOL-1052
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;
|
||||
|
||||
if ((thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE) ||
|
||||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ||
|
||||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY))
|
||||
{
|
||||
if (ti.tpl_ctx == 0)
|
||||
{
|
||||
ti.tpl_ctx = new sm::cpsm_tplh_t();
|
||||
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
|
||||
}
|
||||
|
||||
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
|
||||
// call rnd_init for a table more than once.
|
||||
ti.tpl_scan_ctx->rowGroup = NULL;
|
||||
|
||||
try
|
||||
{
|
||||
tableid = execplan::IDB_VTABLE_ID;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
string emsg = "No table ID found for table " + string(table->s->table_name.str);
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
|
||||
goto internal_error;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
sm::tpl_open(tableid, ti.tpl_ctx, hndl);
|
||||
sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
string emsg = "table can not be opened: " + string(e.what());
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
|
||||
goto internal_error;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
string emsg = "table can not be opened";
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
|
||||
goto internal_error;
|
||||
}
|
||||
|
||||
ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
|
||||
|
||||
if ((ti.tpl_scan_ctx->ctp).size() == 0)
|
||||
{
|
||||
uint32_t num_attr = table->s->fields;
|
||||
|
||||
for (uint32_t i = 0; i < num_attr; i++)
|
||||
{
|
||||
CalpontSystemCatalog::ColType ctype;
|
||||
ti.tpl_scan_ctx->ctp.push_back(ctype);
|
||||
}
|
||||
|
||||
// populate coltypes here for table mode because tableband gives treeoid for dictionary column
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
|
||||
{
|
||||
CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str), true);
|
||||
|
||||
if (oidlist.size() != num_attr)
|
||||
{
|
||||
string emsg = "Size mismatch probably caused by front end out of sync";
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
|
||||
goto internal_error;
|
||||
}
|
||||
|
||||
for (unsigned int j = 0; j < oidlist.size(); j++)
|
||||
{
|
||||
CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum);
|
||||
ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype;
|
||||
ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ci->tableMap[table] = ti;
|
||||
return 0;
|
||||
|
||||
error:
|
||||
|
||||
if (ci->cal_conn_hndl)
|
||||
{
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
}
|
||||
|
||||
// do we need to close all connection handle of the table map?
|
||||
return ER_INTERNAL_ERROR;
|
||||
|
||||
internal_error:
|
||||
|
||||
if (ci->cal_conn_hndl)
|
||||
{
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
}
|
||||
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
/*@brief ha_calpont_impl_group_by_next - Return result set for MariaDB group_by
|
||||
pushdown handler
|
||||
*/
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Return a result record for each group_by_handler::next_row() call.
|
||||
* PARAMETERS:
|
||||
* group_hand - group by handler, that preserves initial table and items lists. .
|
||||
* table - TABLE pointer The table to save the result set in.
|
||||
* RETURN:
|
||||
* 0 if success
|
||||
* HA_ERR_END_OF_FILE if the record set has come to an end
|
||||
* others if something went wrong whilst getting the result set
|
||||
***********************************************************/
|
||||
int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table)
|
||||
{
|
||||
THD* thd = current_thd;
|
||||
|
||||
/* If this node is the slave, ignore DML to IDB tables */
|
||||
if (thd->slave_thread && (
|
||||
thd->lex->sql_command == SQLCOM_INSERT ||
|
||||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
|
||||
thd->lex->sql_command == SQLCOM_UPDATE ||
|
||||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
|
||||
thd->lex->sql_command == SQLCOM_DELETE ||
|
||||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
|
||||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
|
||||
thd->lex->sql_command == SQLCOM_LOAD))
|
||||
return 0;
|
||||
|
||||
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
|
||||
return ER_INTERNAL_ERROR;
|
||||
|
||||
if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
|
||||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
|
||||
return HA_ERR_END_OF_FILE;
|
||||
|
||||
// @bug 2547
|
||||
if (thd->infinidb_vtable.impossibleWhereOnUnion)
|
||||
return HA_ERR_END_OF_FILE;
|
||||
|
||||
// @bug 2232. Basic SP support
|
||||
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
|
||||
/*if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
|
||||
{
|
||||
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
*/
|
||||
if (!thd->infinidb_vtable.cal_conn_info)
|
||||
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
|
||||
|
||||
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
|
||||
|
||||
// @bug 3078
|
||||
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
|
||||
{
|
||||
if (ci->cal_conn_hndl)
|
||||
{
|
||||
// send ExeMgr a signal before cloing the connection
|
||||
ByteStream msg;
|
||||
ByteStream::quadbyte qb = 0;
|
||||
msg << qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->exeMgr->write(msg);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// cancel query. ignore connection failure.
|
||||
}
|
||||
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
|
||||
|
||||
cal_table_info ti;
|
||||
ti = ci->tableMap[table];
|
||||
int rc = HA_ERR_END_OF_FILE;
|
||||
|
||||
if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
|
||||
{
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
idbassert(ti.msTablePtr == table);
|
||||
|
||||
try
|
||||
{
|
||||
// fetchNextRow interface forces to use buf.
|
||||
unsigned char buf;
|
||||
rc = fetchNextRow(&buf, ti, ci, true);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
string emsg = string("Error while fetching from ExeMgr: ") + e.what();
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
ci->tableMap[table] = ti;
|
||||
|
||||
if (rc != 0 && rc != HA_ERR_END_OF_FILE)
|
||||
{
|
||||
string emsg;
|
||||
|
||||
// remove this check when all error handling migrated to the new framework.
|
||||
if (rc >= 1000)
|
||||
emsg = ti.tpl_scan_ctx->errMsg;
|
||||
else
|
||||
{
|
||||
logging::ErrorCodes errorcodes;
|
||||
emsg = errorcodes.errorString(rc);
|
||||
}
|
||||
|
||||
setError(thd, ER_INTERNAL_ERROR, emsg);
|
||||
ci->stats.fErrorNo = rc;
|
||||
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table)
|
||||
{
|
||||
int rc = 0;
|
||||
THD* thd = current_thd;
|
||||
cal_connection_info* ci = NULL;
|
||||
|
||||
|
||||
if (thd->slave_thread && (
|
||||
thd->lex->sql_command == SQLCOM_INSERT ||
|
||||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
|
||||
thd->lex->sql_command == SQLCOM_UPDATE ||
|
||||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
|
||||
thd->lex->sql_command == SQLCOM_DELETE ||
|
||||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
|
||||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
|
||||
thd->lex->sql_command == SQLCOM_LOAD))
|
||||
return 0;
|
||||
|
||||
thd->infinidb_vtable.isNewQuery = true;
|
||||
thd->infinidb_vtable.isUnion = false;
|
||||
|
||||
if (thd->infinidb_vtable.cal_conn_info)
|
||||
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
|
||||
|
||||
// MCOL-1052
|
||||
//if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
|
||||
//{
|
||||
// thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;// flip back to normal state
|
||||
// return rc;
|
||||
//}
|
||||
|
||||
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
|
||||
{
|
||||
// @bug 4022. error handling for select part of dml
|
||||
if (ci->cal_conn_hndl && ci->rc)
|
||||
{
|
||||
// send ExeMgr a signal before closing the connection
|
||||
ByteStream msg;
|
||||
ByteStream::quadbyte qb = 0;
|
||||
msg << qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->exeMgr->write(msg);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// this is error handling, so ignore connection failure.
|
||||
}
|
||||
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ci)
|
||||
{
|
||||
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
|
||||
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
|
||||
}
|
||||
|
||||
// @bug 3078. Also session limit variable works the same as ctrl+c
|
||||
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD ||
|
||||
((thd->lex)->sql_command != SQLCOM_INSERT &&
|
||||
(thd->lex)->sql_command != SQLCOM_INSERT_SELECT &&
|
||||
thd->variables.select_limit != (uint64_t) - 1))
|
||||
{
|
||||
if (ci->cal_conn_hndl)
|
||||
{
|
||||
// send ExeMgr a signal before closing the connection
|
||||
ByteStream msg;
|
||||
ByteStream::quadbyte qb = 0;
|
||||
msg << qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->exeMgr->write(msg);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// this is the end of query. Ignore the exception if exemgr connection failed
|
||||
// for whatever reason.
|
||||
}
|
||||
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
// clear querystats because no query stats available for cancelled query
|
||||
ci->queryStats = "";
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
IDEBUG( cerr << "group_by_end for table " << table->s->table_name.str << endl );
|
||||
|
||||
cal_table_info ti = ci->tableMap[table];
|
||||
sm::cpsm_conhdl_t* hndl;
|
||||
|
||||
hndl = ci->cal_conn_hndl;
|
||||
|
||||
if (ti.tpl_ctx)
|
||||
{
|
||||
if (ti.tpl_scan_ctx.get())
|
||||
{
|
||||
try
|
||||
{
|
||||
sm::tpl_scan_close(ti.tpl_scan_ctx);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ti.tpl_scan_ctx.reset();
|
||||
|
||||
try
|
||||
{
|
||||
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats);
|
||||
|
||||
ci->cal_conn_hndl = hndl;
|
||||
|
||||
ti.tpl_ctx = 0;
|
||||
}
|
||||
catch (IDBExcept& e)
|
||||
{
|
||||
if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
|
||||
{
|
||||
string msg = string("Columnstore Query Stats - ") + e.what();
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, e.what());
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, e.what());
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in group_by_end");
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ti.tpl_ctx = 0;
|
||||
|
||||
ci->tableMap[table] = ti;
|
||||
|
||||
// push warnings from CREATE phase
|
||||
if (!ci->warningMsg.empty())
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
|
||||
|
||||
ci->warningMsg.clear();
|
||||
// reset expressionId just in case
|
||||
ci->expressionId = 0;
|
||||
return rc;
|
||||
}
|
||||
|
||||
// vim:sw=4 ts=4:
|
||||
|
@@ -48,11 +48,16 @@ extern int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type);
|
||||
extern int ha_calpont_impl_update_row();
|
||||
extern int ha_calpont_impl_delete_row();
|
||||
extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos);
|
||||
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
|
||||
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
|
||||
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef NEED_CALPONT_INTERFACE
|
||||
#include "ha_calpont_impl_if.h"
|
||||
#include "calpontsystemcatalog.h"
|
||||
#include "ha_calpont.h"
|
||||
extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci);
|
||||
extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted);
|
||||
extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci);
|
||||
@@ -69,6 +74,9 @@ extern std::string ha_calpont_impl_droppartition_ (execplan::CalpontSystemCatal
|
||||
|
||||
extern std::string ha_calpont_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename);
|
||||
extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_info& ci, uint64_t tableLockID);
|
||||
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
|
||||
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
|
||||
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
@@ -99,6 +99,7 @@ struct gp_walk_info
|
||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList groupByCols;
|
||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList subGroupByCols;
|
||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols;
|
||||
std::vector <Item*> havingAggColsItems;
|
||||
execplan::CalpontSelectExecutionPlan::ColumnMap columnMap;
|
||||
// This vector temporarily hold the projection columns to be added
|
||||
// to the returnedCols vector for subquery processing. It will be appended
|
||||
@@ -191,6 +192,28 @@ struct cal_table_info
|
||||
bool moreRows; //are there more rows to consume (b/c of limit)
|
||||
};
|
||||
|
||||
struct cal_group_info
|
||||
{
|
||||
cal_group_info() : groupByFields(0),
|
||||
groupByTables(0),
|
||||
groupByWhere(0),
|
||||
groupByGroup(0),
|
||||
groupByOrder(0),
|
||||
groupByHaving(0),
|
||||
groupByDistinct(false)
|
||||
{ }
|
||||
~cal_group_info() { }
|
||||
|
||||
List<Item>* groupByFields; // MCOL-1052 SELECT
|
||||
TABLE_LIST* groupByTables; // MCOL-1052 FROM
|
||||
Item* groupByWhere; // MCOL-1052 WHERE
|
||||
ORDER* groupByGroup; // MCOL-1052 GROUP BY
|
||||
ORDER* groupByOrder; // MCOL-1052 ORDER BY
|
||||
Item* groupByHaving; // MCOL-1052 HAVING
|
||||
bool groupByDistinct; //MCOL-1052 DISTINCT
|
||||
std::vector<execplan::ParseTree*> pushedPts;
|
||||
};
|
||||
|
||||
typedef std::tr1::unordered_map<TABLE*, cal_table_info> CalTableMap;
|
||||
typedef std::vector<std::string> ColValuesList;
|
||||
typedef std::vector<std::string> ColNameList;
|
||||
@@ -297,7 +320,9 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su
|
||||
|
||||
int cp_get_plan(THD* thd, execplan::SCSEP& csep);
|
||||
int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti);
|
||||
int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi);
|
||||
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false);
|
||||
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false);
|
||||
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
|
||||
void setError(THD* thd, uint32_t errcode, const std::string errmsg);
|
||||
void gp_walk(const Item* item, void* arg);
|
||||
|
@@ -125,6 +125,9 @@ string name(CalpontSystemCatalog::ColType& ct)
|
||||
case CalpontSystemCatalog::DATETIME:
|
||||
return "DATETIME";
|
||||
|
||||
case CalpontSystemCatalog::TIME:
|
||||
return "TIME";
|
||||
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
return "DECIMAL";
|
||||
|
||||
@@ -201,6 +204,7 @@ bool CP_type(CalpontSystemCatalog::ColType& ct)
|
||||
ct.colDataType == CalpontSystemCatalog::BIGINT ||
|
||||
ct.colDataType == CalpontSystemCatalog::DATE ||
|
||||
ct.colDataType == CalpontSystemCatalog::DATETIME ||
|
||||
ct.colDataType == CalpontSystemCatalog::TIME ||
|
||||
ct.colDataType == CalpontSystemCatalog::DECIMAL ||
|
||||
ct.colDataType == CalpontSystemCatalog::UTINYINT ||
|
||||
ct.colDataType == CalpontSystemCatalog::USMALLINT ||
|
||||
@@ -261,6 +265,9 @@ const string format(int64_t v, CalpontSystemCatalog::ColType& ct)
|
||||
oss << DataConvert::datetimeToString(v);
|
||||
break;
|
||||
|
||||
case CalpontSystemCatalog::TIME:
|
||||
oss << DataConvert::timeToString(v);
|
||||
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
{
|
||||
@@ -387,6 +394,10 @@ const int64_t IDB_format(char* str, CalpontSystemCatalog::ColType& ct, uint8_t&
|
||||
v = boost::any_cast<uint64_t>(anyVal);
|
||||
break;
|
||||
|
||||
case CalpontSystemCatalog::TIME:
|
||||
v = boost::any_cast<int64_t>(anyVal);
|
||||
break;
|
||||
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
case CalpontSystemCatalog::UDECIMAL:
|
||||
if (ct.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
|
||||
|
@@ -513,6 +513,19 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
|
||||
Item* orderItem = *(orderCol->item);
|
||||
srcp.reset(buildReturnedColumn(orderItem, gwi, nonSupport));
|
||||
|
||||
// MCOL-1052 GROUP BY handler has all of query's agg Items
|
||||
// as field and correlates them with its extended SELECT Items.
|
||||
if (!srcp)
|
||||
{
|
||||
orderItem = orderCol->item_ptr;
|
||||
|
||||
if (orderItem)
|
||||
{
|
||||
gwi.fatalParseError = false;
|
||||
srcp.reset(buildReturnedColumn(orderItem, gwi, nonSupport));
|
||||
}
|
||||
}
|
||||
|
||||
if (!srcp)
|
||||
return nullOnError(gwi);
|
||||
|
||||
@@ -590,6 +603,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
|
||||
|
||||
case CalpontSystemCatalog::DATE:
|
||||
case CalpontSystemCatalog::DATETIME:
|
||||
case CalpontSystemCatalog::TIME:
|
||||
if (!frm.fIsRange)
|
||||
boundTypeErr = true;
|
||||
else if (dynamic_cast<IntervalColumn*>(frm.fStart.fVal.get()) == NULL)
|
||||
@@ -641,6 +655,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
|
||||
|
||||
case CalpontSystemCatalog::DATE:
|
||||
case CalpontSystemCatalog::DATETIME:
|
||||
case CalpontSystemCatalog::TIME:
|
||||
if (!frm.fIsRange)
|
||||
boundTypeErr = true;
|
||||
else if (dynamic_cast<IntervalColumn*>(frm.fEnd.fVal.get()) == NULL)
|
||||
|
@@ -27,6 +27,8 @@
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include "calpontsystemcatalog.h"
|
||||
#include "dataconvert.h"
|
||||
#include "exceptclasses.h"
|
||||
using namespace logging;
|
||||
|
||||
|
||||
// Required declaration as it isn't in a MairaDB include
|
||||
@@ -70,7 +72,25 @@ static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond)
|
||||
for (std::vector<std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> >::const_iterator it = catalog_tables.begin();
|
||||
it != catalog_tables.end(); ++it)
|
||||
{
|
||||
execplan::CalpontSystemCatalog::RIDList column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true);
|
||||
execplan::CalpontSystemCatalog::RIDList column_rid_list;
|
||||
|
||||
// Note a table may get dropped as you iterate over the list of tables.
|
||||
// So simply ignore the dropped table.
|
||||
try
|
||||
{
|
||||
column_rid_list = systemCatalogPtr->columnRIDs((*it).second, true);
|
||||
}
|
||||
catch (IDBExcept& ex)
|
||||
{
|
||||
if (ex.errorCode() == ERR_TABLE_NOT_IN_CATALOG)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t col_num = 0; col_num < column_rid_list.size(); col_num++)
|
||||
{
|
||||
|
Reference in New Issue
Block a user