1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge branch 'develop' into develop-merge-up-20190729

This commit is contained in:
Andrew Hutchings
2019-08-09 14:01:11 +01:00
committed by GitHub
23 changed files with 2612 additions and 2276 deletions

View File

@ -283,11 +283,13 @@ SET (ENGINE_WE_CONFIGCPP_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/writeengine/x
SET (ENGINE_SERVER_SQL_INCLUDE "${SERVER_SOURCE_ROOT_DIR}/sql") SET (ENGINE_SERVER_SQL_INCLUDE "${SERVER_SOURCE_ROOT_DIR}/sql")
SET (ENGINE_SERVER_INCLUDE_INCLUDE "${SERVER_SOURCE_ROOT_DIR}/include") SET (ENGINE_SERVER_INCLUDE_INCLUDE "${SERVER_SOURCE_ROOT_DIR}/include")
SET (ENGINE_SERVER_PCRE_INCLUDE "${SERVER_BUILD_INCLUDE_DIR}/../pcre") SET (ENGINE_SERVER_PCRE_INCLUDE "${SERVER_BUILD_INCLUDE_DIR}/../pcre")
SET (ENGINE_SERVER_WSREP_INCLUDE "${SERVER_BUILD_INCLUDE_DIR}/../wsrep-lib/include")
SET (ENGINE_SERVER_WSREP_API_INCLUDE "${SERVER_BUILD_INCLUDE_DIR}/../wsrep-lib/wsrep-API/v26/")
SET (ENGINE_UTILS_UDFSDK_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/utils/udfsdk") SET (ENGINE_UTILS_UDFSDK_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/utils/udfsdk")
SET (ENGINE_DEFAULT_INCLUDES ${CMAKE_BINARY_DIR} "." "../" "../../" ${SERVER_BUILD_INCLUDE_DIR}) SET (ENGINE_DEFAULT_INCLUDES ${CMAKE_BINARY_DIR} "." "../" "../../" ${SERVER_BUILD_INCLUDE_DIR})
SET (ENGINE_COMMON_INCLUDES ${ENGINE_DEFAULT_INCLUDES} ${Boost_INCLUDE_DIR} ${LIBXML2_INCLUDE_DIR} ${ENGINE_UTILS_MESSAGEQCPP_INCLUDE} ${ENGINE_WE_SHARED_INCLUDE} ${ENGINE_UTILS_IDBDATAFILE_INCLUDE} ${ENGINE_UTILS_LOGGINGCPP_INCLUDE} ${ENGINE_UTILS_CONFIGCPP_INCLUDE} ${ENGINE_UTILS_COMPRESS_INCLUDE} ${ENGINE_VERSIONING_BRM_INCLUDE} ${ENGINE_UTILS_ROWGROUP_INCLUDE} ${ENGINE_UTILS_COMMON_INCLUDE} ${ENGINE_UTILS_DATACONVERT_INCLUDE} ${ENGINE_UTILS_RWLOCK_INCLUDE} ${ENGINE_UTILS_FUNCEXP_INCLUDE} ${ENGINE_OAMAPPS_ALARMMANAGER_INCLUDE} ${ENGINE_UTILS_INCLUDE} ${ENGINE_OAM_OAMCPP_INCLUDE} ${ENGINE_DBCON_DDLPKGPROC_INCLUDE} ${ENGINE_DBCON_DDLPKG_INCLUDE} ${ENGINE_DBCON_EXECPLAN_INCLUDE} ${ENGINE_UTILS_STARTUP_INCLUDE} ${ENGINE_DBCON_JOBLIST_INCLUDE} ${ENGINE_WE_WRAPPER_INCLUDE} ${ENGINE_WE_SERVER_INCLUDE} ${ENGINE_DBCON_DMLPKG_INCLUDE} ${ENGINE_WE_CLIENT_INCLUDE} ${ENGINE_DBCON_DMLPKGPROC_INCLUDE} ${ENGINE_UTILS_CACHEUTILS_INCLUDE} ${ENGINE_UTILS_MYSQLCL_INCLUDE} ${ENGINE_UTILS_QUERYTELE_INCLUDE} ${ENGINE_UTILS_THRIFT_INCLUDE} ${ENGINE_UTILS_JOINER_INCLUDE} ${ENGINE_UTILS_THREADPOOL_INCLUDE} ${ENGINE_UTILS_BATCHLDR_INCLUDE} ${ENGINE_UTILS_DDLCLEANUP_INCLUDE} ${ENGINE_UTILS_QUERYSTATS_INCLUDE} ${ENGINE_WE_CONFIGCPP_INCLUDE} ${ENGINE_SERVER_SQL_INCLUDE} ${ENGINE_SERVER_INCLUDE_INCLUDE} ${ENGINE_SERVER_PCRE_INCLUDE} ${ENGINE_UTILS_UDFSDK_INCLUDE} ${ENGINE_UTILS_LIBMYSQL_CL_INCLUDE}) SET (ENGINE_COMMON_INCLUDES ${ENGINE_DEFAULT_INCLUDES} ${Boost_INCLUDE_DIR} ${LIBXML2_INCLUDE_DIR} ${ENGINE_UTILS_MESSAGEQCPP_INCLUDE} ${ENGINE_WE_SHARED_INCLUDE} ${ENGINE_UTILS_IDBDATAFILE_INCLUDE} ${ENGINE_UTILS_LOGGINGCPP_INCLUDE} ${ENGINE_UTILS_CONFIGCPP_INCLUDE} ${ENGINE_UTILS_COMPRESS_INCLUDE} ${ENGINE_VERSIONING_BRM_INCLUDE} ${ENGINE_UTILS_ROWGROUP_INCLUDE} ${ENGINE_UTILS_COMMON_INCLUDE} ${ENGINE_UTILS_DATACONVERT_INCLUDE} ${ENGINE_UTILS_RWLOCK_INCLUDE} ${ENGINE_UTILS_FUNCEXP_INCLUDE} ${ENGINE_OAMAPPS_ALARMMANAGER_INCLUDE} ${ENGINE_UTILS_INCLUDE} ${ENGINE_OAM_OAMCPP_INCLUDE} ${ENGINE_DBCON_DDLPKGPROC_INCLUDE} ${ENGINE_DBCON_DDLPKG_INCLUDE} ${ENGINE_DBCON_EXECPLAN_INCLUDE} ${ENGINE_UTILS_STARTUP_INCLUDE} ${ENGINE_DBCON_JOBLIST_INCLUDE} ${ENGINE_WE_WRAPPER_INCLUDE} ${ENGINE_WE_SERVER_INCLUDE} ${ENGINE_DBCON_DMLPKG_INCLUDE} ${ENGINE_WE_CLIENT_INCLUDE} ${ENGINE_DBCON_DMLPKGPROC_INCLUDE} ${ENGINE_UTILS_CACHEUTILS_INCLUDE} ${ENGINE_UTILS_MYSQLCL_INCLUDE} ${ENGINE_UTILS_QUERYTELE_INCLUDE} ${ENGINE_UTILS_THRIFT_INCLUDE} ${ENGINE_UTILS_JOINER_INCLUDE} ${ENGINE_UTILS_THREADPOOL_INCLUDE} ${ENGINE_UTILS_BATCHLDR_INCLUDE} ${ENGINE_UTILS_DDLCLEANUP_INCLUDE} ${ENGINE_UTILS_QUERYSTATS_INCLUDE} ${ENGINE_WE_CONFIGCPP_INCLUDE} ${ENGINE_SERVER_SQL_INCLUDE} ${ENGINE_SERVER_INCLUDE_INCLUDE} ${ENGINE_SERVER_PCRE_INCLUDE} ${ENGINE_SERVER_WSREP_API_INCLUDE} ${ENGINE_SERVER_WSREP_INCLUDE} ${ENGINE_UTILS_UDFSDK_INCLUDE} ${ENGINE_UTILS_LIBMYSQL_CL_INCLUDE})
ADD_SUBDIRECTORY(utils) ADD_SUBDIRECTORY(utils)
ADD_SUBDIRECTORY(oam/oamcpp) ADD_SUBDIRECTORY(oam/oamcpp)

View File

@ -11,9 +11,6 @@ ADD_CUSTOM_COMMAND(
set_source_files_properties(ddl-scan.cpp PROPERTIES COMPILE_FLAGS -Wno-sign-compare) set_source_files_properties(ddl-scan.cpp PROPERTIES COMPILE_FLAGS -Wno-sign-compare)
# Parser puts extra info to stderr.
MY_CHECK_AND_SET_COMPILER_FLAG("-DYYDEBUG=1" DEBUG)
########### next target ############### ########### next target ###############
ADD_LIBRARY(ddlpackage SHARED ADD_LIBRARY(ddlpackage SHARED

View File

@ -22,6 +22,8 @@ SET ( libcalmysql_SRCS
ha_pseudocolumn.cpp) ha_pseudocolumn.cpp)
add_definitions(-DMYSQL_DYNAMIC_PLUGIN) add_definitions(-DMYSQL_DYNAMIC_PLUGIN)
add_definitions(-DEBUG_WALK_COND)
add_definitions(-DINFINIDB_DEBUG)
set_source_files_properties(ha_calpont.cpp PROPERTIES COMPILE_FLAGS "-fno-rtti -fno-implicit-templates") set_source_files_properties(ha_calpont.cpp PROPERTIES COMPILE_FLAGS "-fno-rtti -fno-implicit-templates")

View File

@ -21,6 +21,7 @@
#define NEED_CALPONT_EXTERNS #define NEED_CALPONT_EXTERNS
#include "ha_calpont_impl.h" #include "ha_calpont_impl.h"
#include "ha_mcs_pushdown.h"
static handler* calpont_create_handler(handlerton* hton, static handler* calpont_create_handler(handlerton* hton,
TABLE_SHARE* table, TABLE_SHARE* table,
@ -31,10 +32,19 @@ static int calpont_commit(handlerton* hton, THD* thd, bool all);
static int calpont_rollback(handlerton* hton, THD* thd, bool all); static int calpont_rollback(handlerton* hton, THD* thd, bool all);
static int calpont_close_connection ( handlerton* hton, THD* thd ); static int calpont_close_connection ( handlerton* hton, THD* thd );
handlerton* calpont_hton; handlerton* calpont_hton;
handlerton* mcs_hton;
// handlers creation function for hton.
// Look into ha_mcs_pushdown.* for more details.
static group_by_handler* static group_by_handler*
create_calpont_group_by_handler(THD* thd, Query* query); create_calpont_group_by_handler(THD* thd, Query* query);
static derived_handler*
create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived);
static select_handler*
create_columnstore_select_handler(THD* thd, SELECT_LEX* sel);
/* Variables for example share methods */ /* Variables for example share methods */
/* /*
@ -68,6 +78,7 @@ static uchar* calpont_get_key(INFINIDB_SHARE* share, size_t* length,
return (uchar*) share->table_name; return (uchar*) share->table_name;
} }
// This one is unused
int calpont_discover(handlerton* hton, THD* thd, TABLE_SHARE* share) int calpont_discover(handlerton* hton, THD* thd, TABLE_SHARE* share)
{ {
DBUG_ENTER("calpont_discover"); DBUG_ENTER("calpont_discover");
@ -94,6 +105,7 @@ int calpont_discover(handlerton* hton, THD* thd, TABLE_SHARE* share)
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
} }
// This f() is also unused
int calpont_discover_existence(handlerton* hton, const char* db, int calpont_discover_existence(handlerton* hton, const char* db,
const char* table_name) const char* table_name)
{ {
@ -107,6 +119,7 @@ static int columnstore_init_func(void* p)
struct tm tm; struct tm tm;
time_t t; time_t t;
time(&t); time(&t);
localtime_r(&t, &tm); localtime_r(&t, &tm);
fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ", fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d ",
@ -115,22 +128,24 @@ static int columnstore_init_func(void* p)
fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str()); fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str());
calpont_hton = (handlerton*)p; mcs_hton = (handlerton*)p;
#ifndef _MSC_VER #ifndef _MSC_VER
(void) pthread_mutex_init(&calpont_mutex, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&calpont_mutex, MY_MUTEX_INIT_FAST);
#endif #endif
(void) my_hash_init(&calpont_open_tables, system_charset_info, 32, 0, 0, (void) my_hash_init(&calpont_open_tables, system_charset_info, 32, 0, 0,
(my_hash_get_key) calpont_get_key, 0, 0); (my_hash_get_key) calpont_get_key, 0, 0);
calpont_hton->state = SHOW_OPTION_YES; mcs_hton->state = SHOW_OPTION_YES;
calpont_hton->create = calpont_create_handler; mcs_hton->create = calpont_create_handler;
calpont_hton->flags = HTON_CAN_RECREATE; mcs_hton->flags = HTON_CAN_RECREATE;
// calpont_hton->discover_table= calpont_discover; // mcs_hton->discover_table= calpont_discover;
// calpont_hton->discover_table_existence= calpont_discover_existence; // mcs_hton->discover_table_existence= calpont_discover_existence;
calpont_hton->commit = calpont_commit; mcs_hton->commit = calpont_commit;
calpont_hton->rollback = calpont_rollback; mcs_hton->rollback = calpont_rollback;
calpont_hton->close_connection = calpont_close_connection; mcs_hton->close_connection = calpont_close_connection;
calpont_hton->create_group_by = create_calpont_group_by_handler; //mcs_hton->create_group_by = create_calpont_group_by_handler;
mcs_hton->create_derived = create_columnstore_derived_handler;
mcs_hton->create_select = create_columnstore_select_handler;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
@ -159,6 +174,10 @@ static int infinidb_init_func(void* p)
calpont_hton->commit = calpont_commit; calpont_hton->commit = calpont_commit;
calpont_hton->rollback = calpont_rollback; calpont_hton->rollback = calpont_rollback;
calpont_hton->close_connection = calpont_close_connection; calpont_hton->close_connection = calpont_close_connection;
calpont_hton->create_group_by = create_calpont_group_by_handler;
//calpont_hton->create_derived = create_columnstore_derived_handler;
calpont_hton->create_select = create_columnstore_select_handler;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
@ -264,10 +283,6 @@ int ha_calpont::open(const char* name, int mode, uint32_t test_if_locked)
{ {
DBUG_ENTER("ha_calpont::open"); DBUG_ENTER("ha_calpont::open");
//if (!(share = get_share(name, table)))
// DBUG_RETURN(1);
//thr_lock_data_init(&share->lock,&lock,NULL);
int rc = ha_calpont_impl_open(name, mode, test_if_locked); int rc = ha_calpont_impl_open(name, mode, test_if_locked);
DBUG_RETURN(rc); DBUG_RETURN(rc);
@ -293,7 +308,6 @@ int ha_calpont::open(const char* name, int mode, uint32_t test_if_locked)
int ha_calpont::close(void) int ha_calpont::close(void)
{ {
DBUG_ENTER("ha_calpont::close"); DBUG_ENTER("ha_calpont::close");
//DBUG_RETURN(free_share(share));
int rc = ha_calpont_impl_close(); int rc = ha_calpont_impl_close();
@ -511,7 +525,11 @@ int ha_calpont::rnd_init(bool scan)
{ {
DBUG_ENTER("ha_calpont::rnd_init"); DBUG_ENTER("ha_calpont::rnd_init");
int rc = ha_calpont_impl_rnd_init(table); int rc = 0;
if(scan)
{
rc = ha_calpont_impl_rnd_init(table);
}
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
@ -929,256 +947,7 @@ struct st_mysql_storage_engine columnstore_storage_engine =
struct st_mysql_storage_engine infinidb_storage_engine = struct st_mysql_storage_engine infinidb_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
/*@brief check_walk - It traverses filter conditions*/ #include "ha_mcs_pushdown.cpp"
/************************************************************
* DESCRIPTION:
* It traverses filter predicates looking for unsupported
* JOIN types: non-equi JOIN, e.g t1.c1 > t2.c2;
* logical OR.
* PARAMETERS:
* thd - THD pointer.
* derived - TABLE_LIST* to work with.
* RETURN:
* derived_handler if possible
* NULL in other case
***********************************************************/
void check_walk(const Item* item, void* arg)
{
bool* unsupported_feature = static_cast<bool*>(arg);
if ( *unsupported_feature )
return;
switch (item->type())
{
case Item::FUNC_ITEM:
{
const Item_func* ifp = static_cast<const Item_func*>(item);
if ( ifp->functype() != Item_func::EQ_FUNC ) // NON-equi JOIN
{
if ( ifp->argument_count() == 2 &&
ifp->arguments()[0]->type() == Item::FIELD_ITEM &&
ifp->arguments()[1]->type() == Item::FIELD_ITEM )
{
Item_field* left= static_cast<Item_field*>(ifp->arguments()[0]);
Item_field* right= static_cast<Item_field*>(ifp->arguments()[1]);
if ( left->field->table != right->field->table )
{
*unsupported_feature = true;
return;
}
}
else // IN + correlated subquery
{
if ( ifp->functype() == Item_func::NOT_FUNC
&& ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM )
{
check_walk(ifp->arguments()[0], arg);
}
}
}
break;
}
case Item::EXPR_CACHE_ITEM: // IN + correlated subquery
{
const Item_cache_wrapper* icw = static_cast<const Item_cache_wrapper*>(item);
if ( icw->get_orig_item()->type() == Item::FUNC_ITEM )
{
const Item_func *ifp = static_cast<const Item_func*>(icw->get_orig_item());
if ( ifp->argument_count() == 2 &&
( ifp->arguments()[0]->type() == Item::Item::SUBSELECT_ITEM
|| ifp->arguments()[1]->type() == Item::Item::SUBSELECT_ITEM ))
{
*unsupported_feature = true;
return;
}
}
break;
}
case Item::COND_ITEM: // OR in cods is unsupported yet
{
Item_cond* icp = (Item_cond*)item;
if ( is_cond_or(icp) )
{
*unsupported_feature = true;
}
break;
}
default:
{
break;
}
}
}
/*@brief create_calpont_group_by_handler- Creates handler*/
/***********************************************************
* DESCRIPTION:
* Creates a group_by pushdown handler if there is no:
* non-equi JOIN, e.g * t1.c1 > t2.c2
* logical OR in the filter predicates
* Impossible WHERE
* Impossible HAVING
* and there is either GROUP BY or aggregation function
* exists at the top level.
* Valid queries with the last two crashes the server if
* processed.
* Details are in server/sql/group_by_handler.h
* PARAMETERS:
* thd - THD pointer
* query - Query structure LFM in group_by_handler.h
* RETURN:
* group_by_handler if success
* NULL in other case
***********************************************************/
static group_by_handler*
create_calpont_group_by_handler(THD* thd, Query* query)
{
ha_calpont_group_by_handler* handler = NULL;
// same as thd->lex->current_select
SELECT_LEX *select_lex = query->from->select_lex;
// Create a handler if query is valid. See comments for details.
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE
&& ( thd->variables.infinidb_vtable_mode == 0
|| thd->variables.infinidb_vtable_mode == 2 )
&& ( query->group_by || select_lex->with_sum_func ) )
{
bool unsupported_feature = false;
// revisit SELECT_LEX for all units
for(TABLE_LIST* tl = query->from; !unsupported_feature && tl; tl = tl->next_global)
{
select_lex = tl->select_lex;
// Correlation subquery. Comming soon so fail on this yet.
unsupported_feature = select_lex->is_correlated;
// Impossible HAVING or WHERE
if ( ( !unsupported_feature && query->having && select_lex->having_value == Item::COND_FALSE )
|| ( select_lex->cond_count > 0
&& select_lex->cond_value == Item::COND_FALSE ) )
{
unsupported_feature = true;
}
// Unsupported JOIN conditions
if ( !unsupported_feature )
{
JOIN *join = select_lex->join;
Item_cond *icp = 0;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
if ( unsupported_feature == false
&& icp )
{
icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
// Optimizer could move some join conditions into where
if (select_lex->where != 0)
icp = reinterpret_cast<Item_cond*>(select_lex->where);
if ( unsupported_feature == false
&& icp )
{
icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
}
} // unsupported features check ends here
if ( !unsupported_feature )
{
handler = new ha_calpont_group_by_handler(thd, query);
// Notify the server, that CS handles GROUP BY, ORDER BY and HAVING clauses.
query->group_by = NULL;
query->order_by = NULL;
query->having = NULL;
}
}
return handler;
}
/***********************************************************
* DESCRIPTION:
* GROUP BY handler constructor
* PARAMETERS:
* thd - THD pointer.
* query - Query describing structure
***********************************************************/
ha_calpont_group_by_handler::ha_calpont_group_by_handler(THD* thd_arg, Query* query)
: group_by_handler(thd_arg, calpont_hton),
select(query->select),
table_list(query->from),
distinct(query->distinct),
where(query->where),
group_by(query->group_by),
order_by(query->order_by),
having(query->having)
{
}
/***********************************************************
* DESCRIPTION:
* GROUP BY destructor
***********************************************************/
ha_calpont_group_by_handler::~ha_calpont_group_by_handler()
{
}
/***********************************************************
* DESCRIPTION:
* Makes the plan and prepares the data
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::init_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::init_scan");
// Save vtable_state to restore the after we inited.
THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state;
// MCOL-1052 Should be removed after cleaning the code up.
thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE;
int rc = ha_calpont_impl_group_by_init(this, table);
thd->infinidb_vtable.vtable_state = oldState;
DBUG_RETURN(rc);
}
/***********************************************************
* DESCRIPTION:
* Fetches a row and saves it to a temporary table.
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::next_row()
{
DBUG_ENTER("ha_calpont_group_by_handler::next_row");
int rc = ha_calpont_impl_group_by_next(this, table);
DBUG_RETURN(rc);
}
/***********************************************************
* DESCRIPTION:
* Shuts the scan down.
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::end_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::end_scan");
int rc = ha_calpont_impl_group_by_end(this, table);
DBUG_RETURN(rc);
}
mysql_declare_plugin(columnstore) mysql_declare_plugin(columnstore)
{ {

View File

@ -22,9 +22,10 @@
#include "ha_mcs_sysvars.h" #include "ha_mcs_sysvars.h"
extern handlerton* calpont_hton; extern handlerton* calpont_hton;
extern handlerton* mcs_hton;
/** @brief /** @brief
This structure will be shared among all open handlers. INFINIDB_SHARE is a structure that will be shared among all open handlers.
This example implements the minimum of what you will probably need. This example implements the minimum of what you will probably need.
*/ */
typedef struct st_calpont_share typedef struct st_calpont_share
@ -228,51 +229,4 @@ public:
} }
}; };
/*@brief group_by_handler class*/
/***********************************************************
* DESCRIPTION:
* Provides server with group_by_handler API methods.
* One should read comments in server/sql/group_by_handler.h
* Attributes:
* select - attribute contains all GROUP BY, HAVING, ORDER items and calls it
* an extended SELECT list according to comments in
* server/sql/group_handler.cc.
* So the temporary table for
* select count(*) from b group by a having a > 3 order by a
* will have 4 columns not 1.
* However server ignores all NULLs used in
* GROUP BY, HAVING, ORDER.
* select_list_descr - contains Item description returned by Item->print()
* that is used in lookup for corresponding columns in
* extended SELECT list.
* table_list - contains all tables involved. Must be CS tables only.
* distinct - looks like a useless thing for now. Couldn't get it set by server.
* where - where items.
* group_by - group by ORDER items.
* order_by - order by ORDER items.
* having - having Item.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
***********************************************************/
class ha_calpont_group_by_handler: public group_by_handler
{
public:
ha_calpont_group_by_handler(THD* thd_arg, Query* query);
~ha_calpont_group_by_handler();
int init_scan();
int next_row();
int end_scan();
List<Item>* select;
TABLE_LIST* table_list;
bool distinct;
Item* where;
ORDER* group_by;
ORDER* order_by;
Item* having;
};
#endif //HA_CALPONT_H__ #endif //HA_CALPONT_H__

View File

@ -2306,17 +2306,6 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
bool schemaSyncOnly = false; bool schemaSyncOnly = false;
bool isCreate = true; bool isCreate = true;
// relate to bug 1793. Make sure this is not for a select statement because
if (db == "calpontsys" && thd->infinidb_vtable.vtable_state == THD::INFINIDB_INIT
&& tbl != "systable"
&& tbl != "syscolumn" && tbl != "sysindex"
&& tbl != "sysconstraint" && tbl != "sysindexcol"
&& tbl != "sysconstraintcol" )
{
setError(thd, ER_INTERNAL_ERROR, "Cannot create non-system Calpont tables in calpontsys database");
return 1;
}
regex pat("[[:space:]]*SCHEMA[[:space:]]+SYNC[[:space:]]+ONLY", regex_constants::extended); regex pat("[[:space:]]*SCHEMA[[:space:]]+SYNC[[:space:]]+ONLY", regex_constants::extended);
if (regex_search(tablecomment, pat)) if (regex_search(tablecomment, pat))
@ -2336,11 +2325,6 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
#endif #endif
return 0; return 0;
} }
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ALTER_VTABLE) //check if it is select
{
return 0;
}
} }
else else
{ {

View File

@ -2110,10 +2110,6 @@ int ha_calpont_impl_commit_ (handlerton* hton, THD* thd, bool all, cal_connectio
{ {
int rc = 0; int rc = 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ALTER_VTABLE ||
thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE )
return rc;
if (thd->slave_thread && !ci.replicationEnabled) if (thd->slave_thread && !ci.replicationEnabled)
return 0; return 0;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,7 @@
#define HA_CALPONT_IMPL_H__ #define HA_CALPONT_IMPL_H__
#include "idb_mysql.h" #include "idb_mysql.h"
#include "ha_mcs_pushdown.h"
#ifdef NEED_CALPONT_EXTERNS #ifdef NEED_CALPONT_EXTERNS
extern int ha_calpont_impl_discover_existence(const char* schema, const char* name); extern int ha_calpont_impl_discover_existence(const char* schema, const char* name);
@ -29,7 +30,7 @@ extern int ha_calpont_impl_open(const char* name, int mode, uint32_t test_if_loc
extern int ha_calpont_impl_close(void); extern int ha_calpont_impl_close(void);
extern int ha_calpont_impl_rnd_init(TABLE* table); extern int ha_calpont_impl_rnd_init(TABLE* table);
extern int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table);
extern int ha_calpont_impl_rnd_end(TABLE* table); extern int ha_calpont_impl_rnd_end(TABLE* table, bool is_derived_hand = false);
extern int ha_calpont_impl_write_row(uchar* buf, TABLE* table); extern int ha_calpont_impl_write_row(uchar* buf, TABLE* table);
extern void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table); extern void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table);
extern int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table); extern int ha_calpont_impl_end_bulk_insert(bool abort, TABLE* table);
@ -45,6 +46,7 @@ extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info , TABLE* table);
#endif #endif
@ -52,6 +54,7 @@ extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand,
#include "ha_calpont_impl_if.h" #include "ha_calpont_impl_if.h"
#include "calpontsystemcatalog.h" #include "calpontsystemcatalog.h"
#include "ha_calpont.h" #include "ha_calpont.h"
#include "ha_mcs_pushdown.h"
extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci); extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci);
extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted); extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted);
extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci); extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci);
@ -71,6 +74,7 @@ extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table); extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_cs_impl_derived_next(TABLE* table);
#endif #endif
#endif #endif

View File

@ -150,6 +150,8 @@ struct gp_walk_info
// Kludge for MCOL-1472 // Kludge for MCOL-1472
bool inCaseStmt; bool inCaseStmt;
bool cs_vtable_is_update_with_derive;
bool cs_vtable_impossible_where_on_union;
gp_walk_info() : sessionid(0), gp_walk_info() : sessionid(0),
fatalParseError(false), fatalParseError(false),
@ -166,8 +168,10 @@ struct gp_walk_info
lastSub(0), lastSub(0),
derivedTbCnt(0), derivedTbCnt(0),
recursionLevel(-1), recursionLevel(-1),
recursionHWM(0), recursionHWM(0),
inCaseStmt(false) inCaseStmt(false),
cs_vtable_is_update_with_derive(false),
cs_vtable_impossible_where_on_union(false)
{} {}
~gp_walk_info() {} ~gp_walk_info() {}
@ -286,6 +290,7 @@ struct cal_connection_info
std::stack<sm::cpsm_conhdl_t*> cal_conn_hndl_st; std::stack<sm::cpsm_conhdl_t*> cal_conn_hndl_st;
int queryState; int queryState;
CalTableMap tableMap; CalTableMap tableMap;
std::set<TABLE*> physTablesList;
sm::tableid_t currentTable; sm::tableid_t currentTable;
uint32_t traceFlags; uint32_t traceFlags;
std::string queryStats; std::string queryStats;
@ -337,14 +342,16 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su
int cp_get_plan(THD* thd, execplan::SCSEP& csep); int cp_get_plan(THD* thd, execplan::SCSEP& csep);
int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti); int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti);
int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false); int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi);
int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isPushdownHand = false);
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false);
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
void setError(THD* thd, uint32_t errcode, const std::string errmsg); void setError(THD* thd, uint32_t errcode, const std::string errmsg);
void gp_walk(const Item* item, void* arg); void gp_walk(const Item* item, void* arg);
void parse_item (Item* item, std::vector<Item_field*>& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL); void parse_item (Item* item, std::vector<Item_field*>& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL);
const std::string bestTableName(const Item_field* ifp); const std::string bestTableName(const Item_field* ifp);
bool isInfiniDB(TABLE* table_ptr); bool isMCSTable(TABLE* table_ptr);
// execution plan util functions prototypes // execution plan util functions prototypes
execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand = false); execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand = false);

View File

@ -322,9 +322,12 @@ ParseTree* setDerivedFilter(THD* thd, ParseTree*& n,
FromSubQuery::FromSubQuery(gp_walk_info& gwip) : SubQuery(gwip) FromSubQuery::FromSubQuery(gp_walk_info& gwip) : SubQuery(gwip)
{} {}
FromSubQuery::FromSubQuery(gp_walk_info& gwip, SELECT_LEX* sub) : FromSubQuery::FromSubQuery(gp_walk_info& gwip,
SubQuery(gwip), SELECT_LEX* sub,
fFromSub(sub) bool isPushdownHandler) :
SubQuery(gwip),
fFromSub(sub),
fPushdownHand(isPushdownHandler)
{} {}
FromSubQuery::~FromSubQuery() FromSubQuery::~FromSubQuery()
@ -346,7 +349,7 @@ SCSEP FromSubQuery::transform()
csep->derivedTbAlias(fAlias); // always lower case csep->derivedTbAlias(fAlias); // always lower case
csep->derivedTbView(fGwip.viewName.alias); csep->derivedTbView(fGwip.viewName.alias);
if (getSelectPlan(gwi, *fFromSub, csep) != 0) if (getSelectPlan(gwi, *fFromSub, csep, fPushdownHand) != 0)
{ {
fGwip.fatalParseError = true; fGwip.fatalParseError = true;

View File

@ -0,0 +1,620 @@
/*
Copyright (c) 2019 MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
// ha_calpont.cpp includes this file.
void mutate_optimizer_flags(THD *thd_)
{
// MCOL-2178 Disable all optimizer flags as it was in the fork.
// CS restores it later in SH::scan_end() and in case of an error
// in SH::scan_init()
set_original_optimizer_flags(thd_->variables.optimizer_switch, thd_);
thd_->variables.optimizer_switch = OPTIMIZER_SWITCH_IN_TO_EXISTS |
OPTIMIZER_SWITCH_EXISTS_TO_IN |
OPTIMIZER_SWITCH_COND_PUSHDOWN_FOR_DERIVED;
}
void restore_optimizer_flags(THD *thd_)
{
// MCOL-2178 restore original optimizer flags after SH, DH
ulonglong orig_flags = get_original_optimizer_flags(thd_);
if (orig_flags)
{
thd_->variables.optimizer_switch = orig_flags;
set_original_optimizer_flags(0, thd_);
}
}
/*@brief check_walk - It traverses filter conditions*/
/************************************************************
* DESCRIPTION:
* It traverses filter predicates looking for unsupported
* JOIN types: non-equi JOIN, e.g t1.c1 > t2.c2;
* logical OR.
* PARAMETERS:
* thd - THD pointer.
* derived - TABLE_LIST* to work with.
* RETURN:
* derived_handler if possible
* NULL in other case
***********************************************************/
void check_walk(const Item* item, void* arg)
{
bool* unsupported_feature = static_cast<bool*>(arg);
if ( *unsupported_feature )
return;
switch (item->type())
{
case Item::FUNC_ITEM:
{
const Item_func* ifp = static_cast<const Item_func*>(item);
if ( ifp->functype() != Item_func::EQ_FUNC ) // NON-equi JOIN
{
if ( ifp->argument_count() == 2 &&
ifp->arguments()[0]->type() == Item::FIELD_ITEM &&
ifp->arguments()[1]->type() == Item::FIELD_ITEM )
{
Item_field* left= static_cast<Item_field*>(ifp->arguments()[0]);
Item_field* right= static_cast<Item_field*>(ifp->arguments()[1]);
if ( left->field->table != right->field->table )
{
*unsupported_feature = true;
return;
}
}
else // IN + correlated subquery
{
if ( ifp->functype() == Item_func::NOT_FUNC
&& ifp->arguments()[0]->type() == Item::EXPR_CACHE_ITEM )
{
check_walk(ifp->arguments()[0], arg);
}
}
}
break;
}
case Item::EXPR_CACHE_ITEM: // IN + correlated subquery
{
const Item_cache_wrapper* icw = static_cast<const Item_cache_wrapper*>(item);
if ( icw->get_orig_item()->type() == Item::FUNC_ITEM )
{
const Item_func *ifp = static_cast<const Item_func*>(icw->get_orig_item());
if ( ifp->argument_count() == 2 &&
( ifp->arguments()[0]->type() == Item::Item::SUBSELECT_ITEM
|| ifp->arguments()[1]->type() == Item::Item::SUBSELECT_ITEM ))
{
*unsupported_feature = true;
return;
}
}
break;
}
case Item::COND_ITEM: // OR in JOIN conds is unsupported yet
{
Item_cond* icp = (Item_cond*)item;
if ( is_cond_or(icp) )
{
*unsupported_feature = true;
}
break;
}
default:
{
break;
}
}
}
/*@brief create_calpont_group_by_handler- Creates handler*/
/***********************************************************
* DESCRIPTION:
* Creates a group_by pushdown handler if there is no:
* non-equi JOIN, e.g * t1.c1 > t2.c2
* logical OR in the filter predicates
* Impossible WHERE
* Impossible HAVING
* and there is either GROUP BY or aggregation function
* exists at the top level.
* Valid queries with the last two crashes the server if
* processed.
* Details are in server/sql/group_by_handler.h
* PARAMETERS:
* thd - THD pointer
* query - Query structure LFM in group_by_handler.h
* RETURN:
* group_by_handler if success
* NULL in other case
***********************************************************/
static group_by_handler*
create_calpont_group_by_handler(THD* thd, Query* query)
{
ha_calpont_group_by_handler* handler = NULL;
// MCOL-2178 Disable SP support in the group_by_handler for now
// Check the session variable value to enable/disable use of
// group_by_handler
if (!get_group_by_handler(thd) || (thd->lex)->sphead)
{
return handler;
}
// same as thd->lex->current_select
SELECT_LEX *select_lex = query->from->select_lex;
// Create a handler if query is valid. See comments for details.
if ( query->group_by || select_lex->with_sum_func )
{
bool unsupported_feature = false;
// revisit SELECT_LEX for all units
for(TABLE_LIST* tl = query->from; !unsupported_feature && tl; tl = tl->next_global)
{
select_lex = tl->select_lex;
// Correlation subquery. Comming soon so fail on this yet.
unsupported_feature = select_lex->is_correlated;
// Impossible HAVING or WHERE
if ( ( !unsupported_feature && query->having && select_lex->having_value == Item::COND_FALSE )
|| ( select_lex->cond_count > 0
&& select_lex->cond_value == Item::COND_FALSE ) )
{
unsupported_feature = true;
}
// Unsupported JOIN conditions
if ( !unsupported_feature )
{
JOIN *join = select_lex->join;
Item_cond *icp = 0;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
if ( unsupported_feature == false
&& icp )
{
icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
// Optimizer could move some join conditions into where
if (select_lex->where != 0)
icp = reinterpret_cast<Item_cond*>(select_lex->where);
if ( unsupported_feature == false
&& icp )
{
icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
}
} // unsupported features check ends here
if ( !unsupported_feature )
{
handler = new ha_calpont_group_by_handler(thd, query);
// Notify the server, that CS handles GROUP BY, ORDER BY and HAVING clauses.
query->group_by = NULL;
query->order_by = NULL;
query->having = NULL;
}
}
return handler;
}
/*@brief create_columnstore_derived_handler- Creates handler*/
/************************************************************
* DESCRIPTION:
* Creates a derived handler if there is no non-equi JOIN, e.g
* t1.c1 > t2.c2 and logical OR in the filter predicates.
* More details in server/sql/derived_handler.h
* PARAMETERS:
* thd - THD pointer.
* derived - TABLE_LIST* to work with.
* RETURN:
* derived_handler if possible
* NULL in other case
***********************************************************/
static derived_handler*
create_columnstore_derived_handler(THD* thd, TABLE_LIST *derived)
{
ha_columnstore_derived_handler* handler = NULL;
// MCOL-2178 Disable SP support in the derived_handler for now
// Check the session variable value to enable/disable use of
// derived_handler
if (!get_derived_handler(thd) || (thd->lex)->sphead)
{
return handler;
}
SELECT_LEX_UNIT *unit= derived->derived;
bool unsupported_feature = false;
{
SELECT_LEX select_lex = *unit->first_select();
JOIN* join = select_lex.join;
Item_cond* icp = 0;
if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds);
if (!join)
{
icp = reinterpret_cast<Item_cond*>(select_lex.where);
}
if ( icp )
{
//icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
}
if ( !unsupported_feature )
handler= new ha_columnstore_derived_handler(thd, derived);
return handler;
}
/***********************************************************
* DESCRIPTION:
* derived_handler constructor
* PARAMETERS:
* thd - THD pointer.
* tbl - tables involved.
***********************************************************/
ha_columnstore_derived_handler::ha_columnstore_derived_handler(THD *thd,
TABLE_LIST *dt)
: derived_handler(thd, calpont_hton)
{
derived = dt;
}
/***********************************************************
* DESCRIPTION:
* derived_handler destructor
***********************************************************/
ha_columnstore_derived_handler::~ha_columnstore_derived_handler()
{}
/*@brief Initiate the query for derived_handler */
/***********************************************************
* DESCRIPTION:
* Execute the query and saves derived table query.
* PARAMETERS:
*
* RETURN:
* rc as int
***********************************************************/
int ha_columnstore_derived_handler::init_scan()
{
char query_buff[4096];
DBUG_ENTER("ha_columnstore_derived_handler::init_scan");
// Save query for logging
String derived_query(query_buff, sizeof(query_buff), thd->charset());
derived_query.length(0);
derived->derived->print(&derived_query, QT_ORDINARY);
mcs_handler_info mhi = mcs_handler_info(static_cast<void*>(this), DERIVED);
// this::table is the place for the result set
int rc = ha_cs_impl_pushdown_init(&mhi, table);
DBUG_RETURN(rc);
}
/*@brief Fetch next row for derived_handler */
/***********************************************************
* DESCRIPTION:
* Fetches next row and saves it in the temp table
* PARAMETERS:
*
* RETURN:
* rc as int
*
***********************************************************/
int ha_columnstore_derived_handler::next_row()
{
DBUG_ENTER("ha_columnstore_derived_handler::next_row");
int rc = ha_calpont_impl_rnd_next(table->record[0], table);
DBUG_RETURN(rc);
}
/*@brief Finishes the scan and clean it up */
/***********************************************************
* DESCRIPTION:
* Finishes the scan for derived handler
* PARAMETERS:
*
* RETURN:
* rc as int
*
***********************************************************/
int ha_columnstore_derived_handler::end_scan()
{
DBUG_ENTER("ha_columnstore_derived_handler::end_scan");
int rc = ha_calpont_impl_rnd_end(table, true);
DBUG_RETURN(rc);
}
void ha_columnstore_derived_handler::print_error(int, unsigned long)
{
}
/***********************************************************
* DESCRIPTION:
* GROUP BY handler constructor
* PARAMETERS:
* thd - THD pointer.
* query - Query describing structure
***********************************************************/
ha_calpont_group_by_handler::ha_calpont_group_by_handler(THD* thd_arg, Query* query)
: group_by_handler(thd_arg, calpont_hton),
select(query->select),
table_list(query->from),
distinct(query->distinct),
where(query->where),
group_by(query->group_by),
order_by(query->order_by),
having(query->having)
{
}
/***********************************************************
* DESCRIPTION:
* GROUP BY destructor
***********************************************************/
ha_calpont_group_by_handler::~ha_calpont_group_by_handler()
{
}
/***********************************************************
* DESCRIPTION:
* Makes the plan and prepares the data
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::init_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::init_scan");
int rc = ha_calpont_impl_group_by_init(this, table);
DBUG_RETURN(rc);
}
/***********************************************************
* DESCRIPTION:
* Fetches a row and saves it to a temporary table.
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::next_row()
{
DBUG_ENTER("ha_calpont_group_by_handler::next_row");
int rc = ha_calpont_impl_group_by_next(this, table);
DBUG_RETURN(rc);
}
/***********************************************************
* DESCRIPTION:
* Shuts the scan down.
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::end_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::end_scan");
int rc = ha_calpont_impl_group_by_end(this, table);
DBUG_RETURN(rc);
}
/*@brief create_columnstore_select_handler- Creates handler*/
/************************************************************
* DESCRIPTION:
* Creates a select handler if there is no non-equi JOIN, e.g
* t1.c1 > t2.c2 and logical OR in the filter predicates.
* More details in server/sql/select_handler.h
* PARAMETERS:
* thd - THD pointer.
* sel - SELECT_LEX* that describes the query.
* RETURN:
* select_handler if possible
* NULL in other case
***********************************************************/
static select_handler*
create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
{
ha_columnstore_select_handler* handler = NULL;
// MCOL-2178 Disable SP support in the select_handler for now.
// Check the session variable value to enable/disable use of
// select_handler
if (!get_select_handler(thd) || (thd->lex)->sphead)
{
return handler;
}
bool unsupported_feature = false;
// Select_handler use the short-cut that effectively disables
// INSERT..SELECT and LDI
if ( (thd->lex)->sql_command == SQLCOM_INSERT_SELECT
|| (thd->lex)->sql_command == SQLCOM_CREATE_TABLE )
{
unsupported_feature = true;
}
// Impossible HAVING or WHERE
// WIP replace with function call
if ( unsupported_feature
|| ( select_lex->having && select_lex->having_value == Item::COND_FALSE )
|| ( select_lex->cond_count > 0
&& select_lex->cond_value == Item::COND_FALSE ) )
{
unsupported_feature = true;
}
// Unsupported query check.
if ( !unsupported_feature )
{
// JOIN expression from WHERE, ON expressions
JOIN* join = select_lex->join;
Item_cond* where_icp = 0;
Item_cond* on_icp = 0;
if (join != 0)
{
where_icp = reinterpret_cast<Item_cond*>(join->conds);
}
if ( where_icp )
{
//where_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
// Looking for JOIN with ON expression through
// TABLE_LIST in FROM until CS meets unsupported feature
TABLE_LIST* table_ptr = select_lex->get_table_list();
for (; !unsupported_feature && table_ptr; table_ptr = table_ptr->next_global)
{
if(table_ptr->on_expr)
{
on_icp = reinterpret_cast<Item_cond*>(table_ptr->on_expr);
//on_icp->traverse_cond(check_walk, &unsupported_feature, Item::POSTFIX);
}
}
// CROSS JOIN w/o conditions isn't supported until MCOL-301
// is ready.
if (join && join->table_count >= 2 && ( !where_icp && !on_icp ))
{
unsupported_feature = true;
}
}
if (!unsupported_feature)
{
handler = new ha_columnstore_select_handler(thd, select_lex);
mutate_optimizer_flags(thd);
}
return handler;
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* PARAMETERS:
* thd - THD pointer.
* select_lex - sematic tree for the query.
***********************************************************/
ha_columnstore_select_handler::ha_columnstore_select_handler(THD *thd,
SELECT_LEX* select_lex)
: select_handler(thd, calpont_hton)
{
select = select_lex;
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
***********************************************************/
ha_columnstore_select_handler::~ha_columnstore_select_handler()
{
}
/*@brief Initiate the query for select_handler */
/***********************************************************
* DESCRIPTION:
* Execute the query and saves select table query.
* PARAMETERS:
*
* RETURN:
* rc as int
***********************************************************/
int ha_columnstore_select_handler::init_scan()
{
char query_buff[4096];
DBUG_ENTER("ha_columnstore_select_handler::init_scan");
// Save query for logging
String select_query(query_buff, sizeof(query_buff), thd->charset());
select_query.length(0);
select->print(thd, &select_query, QT_ORDINARY);
mcs_handler_info mhi = mcs_handler_info(static_cast<void*>(this), SELECT);
// this::table is the place for the result set
int rc = ha_cs_impl_pushdown_init(&mhi, table);
DBUG_RETURN(rc);
}
/*@brief Fetch next row for select_handler */
/***********************************************************
* DESCRIPTION:
* Fetches next row and saves it in the temp table
* PARAMETERS:
*
* RETURN:
* rc as int
*
***********************************************************/
int ha_columnstore_select_handler::next_row()
{
DBUG_ENTER("ha_columnstore_select_handler::next_row");
int rc = ha_calpont_impl_rnd_next(table->record[0], table);
DBUG_RETURN(rc);
}
/*@brief Finishes the scan and clean it up */
/***********************************************************
* DESCRIPTION:
* Finishes the scan for select handler
* PARAMETERS:
*
* RETURN:
* rc as int
*
***********************************************************/
int ha_columnstore_select_handler::end_scan()
{
DBUG_ENTER("ha_columnstore_select_handler::end_scan");
int rc = ha_calpont_impl_rnd_end(table, true);
DBUG_RETURN(rc);
}
void ha_columnstore_select_handler::print_error(int, unsigned long)
{}

View File

@ -0,0 +1,145 @@
/*
Copyright (c) 2019 MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef HA_MCS_PUSH
#define HA_MCS_PUSH
#include "idb_mysql.h"
#include "ha_calpont.h"
void mutate_optimizer_flags(THD *thd_);
void restore_optimizer_flags(THD *thd_);
enum mcs_handler_types_t
{
SELECT,
DERIVED,
GROUP_BY,
LEGACY
};
struct mcs_handler_info
{
mcs_handler_info() : hndl_ptr(NULL), hndl_type(LEGACY) { };
mcs_handler_info(mcs_handler_types_t type) : hndl_ptr(NULL), hndl_type(type) { };
mcs_handler_info(void* ptr, mcs_handler_types_t type) : hndl_ptr(ptr), hndl_type(type) { };
~mcs_handler_info() { };
void* hndl_ptr;
mcs_handler_types_t hndl_type;
};
/*@brief group_by_handler class*/
/***********************************************************
* DESCRIPTION:
* Provides server with group_by_handler API methods.
* One should read comments in server/sql/group_by_handler.h
* Attributes:
* select - attribute contains all GROUP BY, HAVING, ORDER items and calls it
* an extended SELECT list according to comments in
* server/sql/group_handler.cc.
* So the temporary table for
* select count(*) from b group by a having a > 3 order by a
* will have 4 columns not 1.
* However server ignores all NULLs used in
* GROUP BY, HAVING, ORDER.
* select_list_descr - contains Item description returned by Item->print()
* that is used in lookup for corresponding columns in
* extended SELECT list.
* table_list - contains all tables involved. Must be CS tables only.
* distinct - looks like a useless thing for now. Couldn't get it set by server.
* where - where items.
* group_by - group by ORDER items.
* order_by - order by ORDER items.
* having - having Item.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
***********************************************************/
class ha_calpont_group_by_handler: public group_by_handler
{
public:
ha_calpont_group_by_handler(THD* thd_arg, Query* query);
~ha_calpont_group_by_handler();
int init_scan();
int next_row();
int end_scan();
List<Item>* select;
TABLE_LIST* table_list;
bool distinct;
Item* where;
ORDER* group_by;
ORDER* order_by;
Item* having;
};
/*@brief derived_handler class*/
/***********************************************************
* DESCRIPTION:
* derived_handler API methods. Could be used by the server
* tp process sub-queries.
* More details in server/sql/dervied_handler.h
* INFINIDB_SHARE* hton share
* tbl in the constructor is the list of the tables involved.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
***********************************************************/
class ha_columnstore_derived_handler: public derived_handler
{
private:
INFINIDB_SHARE *share;
public:
ha_columnstore_derived_handler(THD* thd_arg, TABLE_LIST *tbl);
~ha_columnstore_derived_handler();
int init_scan();
int next_row();
int end_scan();
void print_error(int, unsigned long);
};
/*@brief select_handler class*/
/***********************************************************
* DESCRIPTION:
* select_handler API methods. Could be used by the server
* tp pushdown the whole query described by SELECT_LEX.
* More details in server/sql/select_handler.h
* INFINIDB_SHARE* hton share
* sel in the constructor is the semantic tree for the query.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
***********************************************************/
class ha_columnstore_select_handler: public select_handler
{
private:
INFINIDB_SHARE *share;
public:
ha_columnstore_select_handler(THD* thd_arg, SELECT_LEX* sel);
~ha_columnstore_select_handler();
int init_scan();
int next_row();
int end_scan();
void print_error(int, unsigned long);
};
#endif

View File

@ -58,6 +58,48 @@ static MYSQL_THDVAR_ULONGLONG(
1 1
); );
// optimizer flags vault
static MYSQL_THDVAR_ULONGLONG(
original_optimizer_flags,
PLUGIN_VAR_NOSYSVAR | PLUGIN_VAR_NOCMDOPT,
"Vault for original optimizer flags. For internal usage.",
NULL,
NULL,
0,
0,
~0U,
1
);
static MYSQL_THDVAR_BOOL(
select_handler,
PLUGIN_VAR_NOCMDARG,
"Enable/Disable the MCS select_handler",
NULL,
NULL,
1
);
static MYSQL_THDVAR_BOOL(
derived_handler,
PLUGIN_VAR_NOCMDARG,
"Enable/Disable the MCS derived_handler",
NULL,
NULL,
1
);
static MYSQL_THDVAR_BOOL(
group_by_handler,
PLUGIN_VAR_NOCMDARG,
"Enable/Disable the MCS group_by_handler",
NULL,
NULL,
1
);
// legacy system variables // legacy system variables
static MYSQL_THDVAR_ULONG( static MYSQL_THDVAR_ULONG(
decimal_scale, decimal_scale,
@ -240,6 +282,10 @@ st_mysql_sys_var* mcs_system_variables[] =
{ {
MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(fe_conn_info_ptr), MYSQL_SYSVAR(fe_conn_info_ptr),
MYSQL_SYSVAR(original_optimizer_flags),
MYSQL_SYSVAR(select_handler),
MYSQL_SYSVAR(derived_handler),
MYSQL_SYSVAR(group_by_handler),
MYSQL_SYSVAR(decimal_scale), MYSQL_SYSVAR(decimal_scale),
MYSQL_SYSVAR(use_decimal_scale), MYSQL_SYSVAR(use_decimal_scale),
MYSQL_SYSVAR(ordered_only), MYSQL_SYSVAR(ordered_only),
@ -275,14 +321,47 @@ void set_fe_conn_info_ptr(void* ptr, THD* thd)
THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr); THDVAR(current_thd, fe_conn_info_ptr) = (uint64_t)(ptr);
} }
bool get_use_legacy_sysvars(THD* thd) ulonglong get_original_optimizer_flags(THD* thd)
{ {
return ( thd == NULL ) ? false : THDVAR(thd, use_legacy_sysvars); return ( current_thd == NULL && thd == NULL ) ? NULL :
THDVAR(current_thd, original_optimizer_flags);
} }
void set_use_legacy_sysvars(THD* thd, bool value) void set_original_optimizer_flags(ulonglong ptr, THD* thd)
{ {
THDVAR(thd, use_legacy_sysvars) = value; if ( current_thd == NULL && thd == NULL)
{
return;
}
THDVAR(current_thd, original_optimizer_flags) = (uint64_t)(ptr);
}
bool get_select_handler(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, select_handler);
}
void set_select_handler(THD* thd, bool value)
{
THDVAR(thd, select_handler) = value;
}
bool get_derived_handler(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, derived_handler);
}
void set_derived_handler(THD* thd, bool value)
{
THDVAR(thd, derived_handler) = value;
}
bool get_group_by_handler(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, group_by_handler);
}
void set_group_by_handler(THD* thd, bool value)
{
THDVAR(thd, group_by_handler) = value;
} }
void set_compression_type(THD* thd, ulong value) void set_compression_type(THD* thd, ulong value)
@ -296,225 +375,135 @@ mcs_compression_type_t get_compression_type(THD* thd) {
bool get_use_decimal_scale(THD* thd) bool get_use_decimal_scale(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale);
return ( thd == NULL ) ? false : thd->variables.infinidb_use_decimal_scale;
else
return ( thd == NULL ) ? false : THDVAR(thd, use_decimal_scale);
} }
void set_use_decimal_scale(THD* thd, bool value) void set_use_decimal_scale(THD* thd, bool value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, use_decimal_scale) = value;
thd->variables.infinidb_use_decimal_scale = value;
else
THDVAR(thd, use_decimal_scale) = value;
} }
ulong get_decimal_scale(THD* thd) ulong get_decimal_scale(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, decimal_scale);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_decimal_scale;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, decimal_scale);
} }
void set_decimal_scale(THD* thd, ulong value) void set_decimal_scale(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, decimal_scale) = value;
thd->variables.infinidb_decimal_scale = value;
else
THDVAR(thd, decimal_scale) = value;
} }
bool get_ordered_only(THD* thd) bool get_ordered_only(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? false : THDVAR(thd, ordered_only);
return ( thd == NULL ) ? false : thd->variables.infinidb_ordered_only;
else
return ( thd == NULL ) ? false : THDVAR(thd, ordered_only);
} }
void set_ordered_only(THD* thd, bool value) void set_ordered_only(THD* thd, bool value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, ordered_only) = value;
thd->variables.infinidb_ordered_only = value;
else
THDVAR(thd, ordered_only) = value;
} }
ulong get_string_scan_threshold(THD* thd) ulong get_string_scan_threshold(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, string_scan_threshold);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_string_scan_threshold;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, string_scan_threshold);
} }
void set_string_scan_threshold(THD* thd, ulong value) void set_string_scan_threshold(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, string_scan_threshold) = value;
thd->variables.infinidb_string_scan_threshold = value;
else
THDVAR(thd, string_scan_threshold) = value;
} }
ulong get_stringtable_threshold(THD* thd) ulong get_stringtable_threshold(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, stringtable_threshold);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_stringtable_threshold;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, stringtable_threshold);
} }
void set_stringtable_threshold(THD* thd, ulong value) void set_stringtable_threshold(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, stringtable_threshold) = value;
thd->variables.infinidb_stringtable_threshold = value;
else
THDVAR(thd, stringtable_threshold) = value;
} }
ulong get_diskjoin_smallsidelimit(THD* thd) ulong get_diskjoin_smallsidelimit(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_smallsidelimit);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_smallsidelimit;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_smallsidelimit);
} }
void set_diskjoin_smallsidelimit(THD* thd, ulong value) void set_diskjoin_smallsidelimit(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, diskjoin_smallsidelimit) = value;
thd->variables.infinidb_diskjoin_smallsidelimit = value;
else
THDVAR(thd, diskjoin_smallsidelimit) = value;
} }
ulong get_diskjoin_largesidelimit(THD* thd) ulong get_diskjoin_largesidelimit(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_largesidelimit);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_largesidelimit;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_largesidelimit);
} }
void set_diskjoin_largesidelimit(THD* thd, ulong value) void set_diskjoin_largesidelimit(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, diskjoin_largesidelimit) = value;
thd->variables.infinidb_diskjoin_largesidelimit = value;
else
THDVAR(thd, diskjoin_largesidelimit) = value;
} }
ulong get_diskjoin_bucketsize(THD* thd) ulong get_diskjoin_bucketsize(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_bucketsize);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_diskjoin_bucketsize;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, diskjoin_bucketsize);
} }
void set_diskjoin_bucketsize(THD* thd, ulong value) void set_diskjoin_bucketsize(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, diskjoin_bucketsize) = value;
thd->variables.infinidb_diskjoin_bucketsize = value;
else
THDVAR(thd, diskjoin_bucketsize) = value;
} }
ulong get_um_mem_limit(THD* thd) ulong get_um_mem_limit(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, um_mem_limit);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_um_mem_limit;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, um_mem_limit);
} }
void set_um_mem_limit(THD* thd, ulong value) void set_um_mem_limit(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, um_mem_limit) = value;
thd->variables.infinidb_um_mem_limit = value;
else
THDVAR(thd, um_mem_limit) = value;
} }
bool get_varbin_always_hex(THD* thd) bool get_varbin_always_hex(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? false : THDVAR(thd, varbin_always_hex);
return ( thd == NULL ) ? false : thd->variables.infinidb_varbin_always_hex;
else
return ( thd == NULL ) ? false : THDVAR(thd, varbin_always_hex);
} }
void set_varbin_always_hex(THD* thd, bool value) void set_varbin_always_hex(THD* thd, bool value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, varbin_always_hex) = value;
thd->variables.infinidb_varbin_always_hex = value;
else
THDVAR(thd, varbin_always_hex) = value;
} }
bool get_double_for_decimal_math(THD* thd) bool get_double_for_decimal_math(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? false : THDVAR(thd, double_for_decimal_math);
return ( thd == NULL ) ? false : thd->variables.infinidb_double_for_decimal_math;
else
return ( thd == NULL ) ? false : THDVAR(thd, double_for_decimal_math);
} }
void set_double_for_decimal_math(THD* thd, bool value) void set_double_for_decimal_math(THD* thd, bool value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, double_for_decimal_math) = value;
thd->variables.infinidb_double_for_decimal_math = value;
else
THDVAR(thd, double_for_decimal_math) = value;
} }
ulong get_local_query(THD* thd) ulong get_local_query(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, local_query);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_local_query;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, local_query);
} }
void set_local_query(THD* thd, ulong value) void set_local_query(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, local_query) = value;
thd->variables.infinidb_local_query = value;
else
THDVAR(thd, local_query) = value;
} }
bool get_use_import_for_batchinsert(THD* thd) bool get_use_import_for_batchinsert(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? false : THDVAR(thd, use_import_for_batchinsert);
return ( thd == NULL ) ? false : thd->variables.infinidb_use_import_for_batchinsert;
else
return ( thd == NULL ) ? false : THDVAR(thd, use_import_for_batchinsert);
} }
void set_use_import_for_batchinsert(THD* thd, bool value) void set_use_import_for_batchinsert(THD* thd, bool value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, use_import_for_batchinsert) = value;
thd->variables.infinidb_use_import_for_batchinsert = value;
else
THDVAR(thd, use_import_for_batchinsert) = value;
} }
ulong get_import_for_batchinsert_delimiter(THD* thd) ulong get_import_for_batchinsert_delimiter(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_delimiter);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_import_for_batchinsert_delimiter;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_delimiter);
} }
void set_import_for_batchinsert_delimiter(THD* thd, ulong value) void set_import_for_batchinsert_delimiter(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, import_for_batchinsert_delimiter) = value;
thd->variables.infinidb_import_for_batchinsert_delimiter = value;
else
THDVAR(thd, import_for_batchinsert_delimiter) = value;
} }
ulong get_import_for_batchinsert_enclosed_by(THD* thd) ulong get_import_for_batchinsert_enclosed_by(THD* thd)
{ {
if(get_use_legacy_sysvars(thd)) return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_enclosed_by);
return ( thd == NULL ) ? 0 : thd->variables.infinidb_import_for_batchinsert_enclosed_by;
else
return ( thd == NULL ) ? 0 : THDVAR(thd, import_for_batchinsert_enclosed_by);
} }
void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value) void set_import_for_batchinsert_enclosed_by(THD* thd, ulong value)
{ {
if(get_use_legacy_sysvars(thd)) THDVAR(thd, import_for_batchinsert_enclosed_by) = value;
thd->variables.infinidb_import_for_batchinsert_enclosed_by = value;
else
THDVAR(thd, import_for_batchinsert_enclosed_by) = value;
} }

View File

@ -40,8 +40,17 @@ void set_compression_type(THD* thd, ulong value);
void* get_fe_conn_info_ptr(THD* thd = NULL); void* get_fe_conn_info_ptr(THD* thd = NULL);
void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL); void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL);
bool get_use_legacy_sysvars(THD* thd); ulonglong get_original_optimizer_flags(THD* thd = NULL);
void set_use_legacy_sysvars(THD* thd, bool value); void set_original_optimizer_flags(ulonglong ptr, THD* thd = NULL);
bool get_select_handler(THD* thd);
void set_select_handler(THD* thd, bool value);
bool get_derived_handler(THD* thd);
void set_derived_handler(THD* thd, bool value);
bool get_group_by_handler(THD* thd);
void set_group_by_handler(THD* thd, bool value);
bool get_use_decimal_scale(THD* thd); bool get_use_decimal_scale(THD* thd);
void set_use_decimal_scale(THD* thd, bool value); void set_use_decimal_scale(THD* thd, bool value);

View File

@ -45,7 +45,10 @@ namespace cal_impl_if
class SubQuery class SubQuery
{ {
public: public:
SubQuery(gp_walk_info& gwip) : fGwip(gwip), fCorrelated(false) {} SubQuery(gp_walk_info& gwip) :
fGwip(gwip),
fCorrelated(false)
{}
virtual ~SubQuery() {} virtual ~SubQuery() {}
virtual gp_walk_info& gwip() const virtual gp_walk_info& gwip() const
{ {
@ -178,7 +181,7 @@ class FromSubQuery : public SubQuery
{ {
public: public:
FromSubQuery(gp_walk_info&); FromSubQuery(gp_walk_info&);
FromSubQuery(gp_walk_info&, SELECT_LEX* fromSub); FromSubQuery(gp_walk_info&, SELECT_LEX* fromSub, bool isPushdownHand=false);
~FromSubQuery(); ~FromSubQuery();
const SELECT_LEX* fromSub() const const SELECT_LEX* fromSub() const
{ {
@ -200,6 +203,7 @@ public:
private: private:
SELECT_LEX* fFromSub; SELECT_LEX* fFromSub;
std::string fAlias; std::string fAlias;
bool fPushdownHand;
}; };
class SelectSubQuery : public SubQuery class SelectSubQuery : public SubQuery

View File

@ -102,14 +102,17 @@ void View::transform()
CalpontSystemCatalog::TableAliasName tn = make_aliasview("", "", alias, viewName); CalpontSystemCatalog::TableAliasName tn = make_aliasview("", "", alias, viewName);
gwi.tbList.push_back(tn); gwi.tbList.push_back(tn);
gwi.tableMap[tn] = make_pair(0, table_ptr); gwi.tableMap[tn] = make_pair(0, table_ptr);
gwi.thd->infinidb_vtable.isUnion = true; //by-pass the 2nd pass of rnd_init // TODO MCOL-2178 isUnion member only assigned, never used
// MIGR::infinidb_vtable.isUnion = true; //by-pass the 2nd pass of rnd_init
} }
else if (table_ptr->view) else if (table_ptr->view)
{ {
// for nested view, the view name is vout.vin... format // for nested view, the view name is vout.vin... format
CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db.str, table_ptr->table_name.str, table_ptr->alias.str, viewName); CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db.str, table_ptr->table_name.str, table_ptr->alias.str, viewName);
gwi.viewName = make_aliastable(table_ptr->db.str, table_ptr->table_name.str, viewName); gwi.viewName = make_aliastable(table_ptr->db.str, table_ptr->table_name.str, viewName);
View* view = new View(table_ptr->view->select_lex, &gwi); // WIP MCOL-2178 CS could mess with the SELECT_LEX unit so better
// use a copy.
View* view = new View(*table_ptr->view->first_select_lex(), &gwi);
view->viewName(gwi.viewName); view->viewName(gwi.viewName);
gwi.viewList.push_back(view); gwi.viewList.push_back(view);
view->transform(); view->transform();
@ -117,7 +120,7 @@ void View::transform()
else else
{ {
// check foreign engine tables // check foreign engine tables
bool infiniDB = (table_ptr->table ? isInfiniDB(table_ptr->table) : true); bool infiniDB = (table_ptr->table ? isMCSTable(table_ptr->table) : true);
// trigger system catalog cache // trigger system catalog cache
if (infiniDB) if (infiniDB)

View File

@ -70,6 +70,8 @@ template <class T> bool isnan(T);
#include "item_windowfunc.h" #include "item_windowfunc.h"
#include "sql_cte.h" #include "sql_cte.h"
#include "tztime.h" #include "tztime.h"
#include "derived_handler.h"
#include "select_handler.h"
// Now clean up the pollution as best we can... // Now clean up the pollution as best we can...
#undef min #undef min

View File

@ -47,21 +47,21 @@ group_concat_max_len=512
sql_mode="ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" sql_mode="ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
# Enable compression by default on create, set to 0 to turn off # Enable compression by default on create, set to 0 to turn off
infinidb_compression_type=2 #columnstore_compression_type=2
# Default for string table threshhold # Default for string table threshhold
infinidb_stringtable_threshold=20 #columnstore_stringtable_threshold=20
# infinidb local query flag # infinidb local query flag
infinidb_local_query=0 #columnstore_local_query=0
infinidb_diskjoin_smallsidelimit=0 #columnstore_diskjoin_smallsidelimit=0
infinidb_diskjoin_largesidelimit=0 #columnstore_diskjoin_largesidelimit=0
infinidb_diskjoin_bucketsize=100 #columnstore_diskjoin_bucketsize=100
infinidb_um_mem_limit=0 #columnstore_um_mem_limit=0
infinidb_use_import_for_batchinsert=1 #columnstore_use_import_for_batchinsert=1
infinidb_import_for_batchinsert_delimiter=7 #columnstore_import_for_batchinsert_delimiter=7
basedir = /usr/local/mariadb/columnstore/mysql/ basedir = /usr/local/mariadb/columnstore/mysql/
character-sets-dir = /usr/local/mariadb/columnstore/mysql/share/charsets/ character-sets-dir = /usr/local/mariadb/columnstore/mysql/share/charsets/

View File

@ -373,17 +373,16 @@ status_t
tpl_close ( cpsm_tplh_t* ntplh, tpl_close ( cpsm_tplh_t* ntplh,
cpsm_conhdl_t** conn_hdl, cpsm_conhdl_t** conn_hdl,
QueryStats& stats, QueryStats& stats,
bool ask_4_stats,
bool clear_scan_ctx) bool clear_scan_ctx)
{ {
cpsm_conhdl_t* hndl = *conn_hdl; cpsm_conhdl_t* hndl = *conn_hdl;
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh; SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh;
if (ntplh) if (ntplh)
SMDEBUGLOG << " tableid: " << ntplh->tableid; SMDEBUGLOG << " tableid: " << ntplh->tableid;
SMDEBUGLOG << endl; SMDEBUGLOG << endl;
#endif
delete ntplh; delete ntplh;
// determine end of result set and end of statement execution // determine end of result set and end of statement execution
@ -391,57 +390,60 @@ tpl_close ( cpsm_tplh_t* ntplh,
{ {
// Get the query stats // Get the query stats
ByteStream bs; ByteStream bs;
ByteStream::quadbyte qb = 3; // Ask for a stats only if a user explicitly asks
bs << qb; if(ask_4_stats)
hndl->write(bs); {
ByteStream::quadbyte qb = 3;
bs << qb;
hndl->write(bs);
}
// MCOL-1601 Dispose of unused empty RowGroup // MCOL-1601 Dispose of unused empty RowGroup
if (clear_scan_ctx) if (clear_scan_ctx)
{ {
std::cout << "tpl_close() clear_scan_ctx read" << std::endl;
bs = hndl->exeMgr->read(); bs = hndl->exeMgr->read();
} }
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl; SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl;
#endif
//keep reading until we get a string //keep reading until we get a string
//TODO: really need to fix this! Why is ExeMgr sending other stuff? // Ask for a stats only if a user explicitly asks
for (int tries = 0; tries < 10; tries++) if(ask_4_stats)
{ {
bs = hndl->exeMgr->read(); for (int tries = 0; tries < 10; tries++)
{
bs = hndl->exeMgr->read();
if (bs.length() == 0) break; if (bs.length() == 0) break;
try try
{ {
bs >> hndl->queryStats; bs >> hndl->queryStats;
bs >> hndl->extendedStats; bs >> hndl->extendedStats;
bs >> hndl->miniStats; bs >> hndl->miniStats;
stats.unserialize(bs); stats.unserialize(bs);
stats.setEndTime(); stats.setEndTime();
stats.insert(); stats.insert();
break; break;
}
catch (IDBExcept&)
{
// @bug4732
end_query(hndl);
throw;
}
catch (...)
{
// querystats messed up. close connection.
// no need to throw for querystats protocol error, like for tablemode.
SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl;
end_query(hndl);
sm_cleanup(hndl);
*conn_hdl = 0;
return STATUS_OK;
//throw runtime_error(string("tbl_close catch exception: ") + e.what());
}
} }
catch (IDBExcept&) } //stats
{
// @bug4732
end_query(hndl);
throw;
}
catch (...)
{
// querystats messed up. close connection.
// no need to throw for querystats protocol error, like for tablemode.
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl;
#endif
end_query(hndl);
sm_cleanup(hndl);
*conn_hdl = 0;
return STATUS_OK;
//throw runtime_error(string("tbl_close catch exception: ") + e.what());
}
}
end_query(hndl); end_query(hndl);
} }

View File

@ -282,7 +282,7 @@ extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*);
extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*);
extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0);
extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); extern status_t tpl_scan_close(sp_cpsm_tplsch_t&);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool clear_scan_ctx = false); extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool ask_4_stats, bool clear_scan_ctx = false);
} }

View File

@ -1591,6 +1591,7 @@ RowGroup& RowGroup::operator+=(const RowGroup& rhs)
} }
hasLongStringField = rhs.hasLongStringField || hasLongStringField; hasLongStringField = rhs.hasLongStringField || hasLongStringField;
useStringTable = rhs.useStringTable || useStringTable;
offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]); offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]);
return *this; return *this;