1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-2178 Connector code now uses separate hton for columnstore engine.

CS now uses hton->close_connection() method to release all FEP
    connections from MDB to ExeMgr.

    Refactor fetchNextRow() to remove decimal and double precision
    changes.
This commit is contained in:
Roman Nozdrin
2019-04-25 11:01:57 +03:00
committed by Gagan Goel
parent 5409eed6f5
commit c297ceb6c1
3 changed files with 46 additions and 78 deletions

View File

@ -34,6 +34,7 @@ static int calpont_commit(handlerton* hton, THD* thd, bool all);
static int calpont_rollback(handlerton* hton, THD* thd, bool all);
static int calpont_close_connection ( handlerton* hton, THD* thd );
handlerton* calpont_hton;
handlerton* mcs_hton;
// handlers creation function for hton.
// Look into ha_mcs_pushdown.* for more details.
@ -79,6 +80,7 @@ static uchar* calpont_get_key(INFINIDB_SHARE* share, size_t* length,
return (uchar*) share->table_name;
}
// This one is unused
int calpont_discover(handlerton* hton, THD* thd, TABLE_SHARE* share)
{
DBUG_ENTER("calpont_discover");
@ -105,6 +107,7 @@ int calpont_discover(handlerton* hton, THD* thd, TABLE_SHARE* share)
DBUG_RETURN(my_errno);
}
// This f() is also unused
int calpont_discover_existence(handlerton* hton, const char* db,
const char* table_name)
{
@ -127,24 +130,24 @@ static int columnstore_init_func(void* p)
fprintf(stderr, "Columnstore: Started; Version: %s-%s\n", columnstore_version.c_str(), columnstore_release.c_str());
calpont_hton = (handlerton*)p;
mcs_hton = (handlerton*)p;
#ifndef _MSC_VER
(void) pthread_mutex_init(&calpont_mutex, MY_MUTEX_INIT_FAST);
#endif
(void) my_hash_init(&calpont_open_tables, system_charset_info, 32, 0, 0,
(my_hash_get_key) calpont_get_key, 0, 0);
calpont_hton->state = SHOW_OPTION_YES;
calpont_hton->create = calpont_create_handler;
calpont_hton->flags = HTON_CAN_RECREATE;
// calpont_hton->discover_table= calpont_discover;
// calpont_hton->discover_table_existence= calpont_discover_existence;
calpont_hton->commit = calpont_commit;
calpont_hton->rollback = calpont_rollback;
calpont_hton->close_connection = calpont_close_connection;
calpont_hton->create_group_by = create_calpont_group_by_handler;
calpont_hton->create_derived = create_columnstore_derived_handler;
calpont_hton->create_select = create_columnstore_select_handler;
mcs_hton->state = SHOW_OPTION_YES;
mcs_hton->create = calpont_create_handler;
mcs_hton->flags = HTON_CAN_RECREATE;
// mcs_hton->discover_table= calpont_discover;
// mcs_hton->discover_table_existence= calpont_discover_existence;
mcs_hton->commit = calpont_commit;
mcs_hton->rollback = calpont_rollback;
mcs_hton->close_connection = calpont_close_connection;
mcs_hton->create_group_by = create_calpont_group_by_handler;
mcs_hton->create_derived = create_columnstore_derived_handler;
mcs_hton->create_select = create_columnstore_select_handler;
DBUG_RETURN(0);
}
@ -282,10 +285,6 @@ int ha_calpont::open(const char* name, int mode, uint32_t test_if_locked)
{
DBUG_ENTER("ha_calpont::open");
//if (!(share = get_share(name, table)))
// DBUG_RETURN(1);
//thr_lock_data_init(&share->lock,&lock,NULL);
int rc = ha_calpont_impl_open(name, mode, test_if_locked);
DBUG_RETURN(rc);
@ -311,7 +310,6 @@ int ha_calpont::open(const char* name, int mode, uint32_t test_if_locked)
int ha_calpont::close(void)
{
DBUG_ENTER("ha_calpont::close");
//DBUG_RETURN(free_share(share));
int rc = ha_calpont_impl_close();

View File

@ -22,6 +22,7 @@
#include "ha_mcs_sysvars.h"
extern handlerton* calpont_hton;
extern handlerton* mcs_hton;
/** @brief
INFINIDB_SHARE is a structure that will be shared among all open handlers.

View File

@ -577,10 +577,6 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
* At a later date we should set this more intelligently
* based on the result set.
*/
/* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */
// if (((*f)->field_length > 19) && ((*f)->field_length != 57))
// (*f)->field_length = strlen(tmp);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
break;
@ -742,29 +738,18 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
if (dl == std::numeric_limits<float>::infinity())
continue;
//int64_t* icvp = (int64_t*)&dl;
//intColVal = *icvp;
Field_float* f2 = (Field_float*)*f;
// bug 3485, reserve enough space for the longest float value
// -3.402823466E+38 to -1.175494351E-38, 0, and
// 1.175494351E-38 to 3.402823466E+38.
(*f)->field_length = 40;
//float float_val = *(float*)(&value);
//f2->store(float_val);
// WIP MCOL-2178
//if (f2->decimals() < (uint32_t)row.getScale(s))
//f2->dec = (uint32_t)row.getScale(s);
f2->store(dl);
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
//storeNumericField(f, intColVal, colType);
//break;
}
case CalpontSystemCatalog::DOUBLE:
@ -781,30 +766,12 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
// 2.2250738585072014E-308 to 1.7976931348623157E+308.
(*f)->field_length = 40;
//double double_val = *(double*)(&value);
//f2->store(double_val);
// WIP MCOL-2178
/*
if ((f2->decimals() == DECIMAL_NOT_SPECIFIED && row.getScale(s) > 0)
|| f2->decimals() < row.getScale(s))
{
f2->dec = row.getScale(s);
}*/
f2->store(dl);
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
//int64_t* icvp = (int64_t*)&dl;
//intColVal = *icvp;
//storeNumericField(f, intColVal, colType);
//break;
}
case CalpontSystemCatalog::LONGDOUBLE:
@ -821,12 +788,6 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
{
char buf[310];
Field_new_decimal* f2 = (Field_new_decimal*)*f;
if ((f2->decimals() == DECIMAL_NOT_SPECIFIED && row.getScale(s) > 0)
|| f2->decimals() < row.getScale(s))
{
f2->dec = row.getScale(s);
}
// dl /= pow(10.0, (double)f2->dec);
snprintf(buf, 310, "%.20Lg", dl);
f2->store(buf, strlen(buf), f2->charset());
if ((*f)->null_ptr)
@ -842,12 +803,6 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
// 2.2250738585072014E-308 to 1.7976931348623157E+308.
(*f)->field_length = 310;
if ((f2->decimals() == DECIMAL_NOT_SPECIFIED && row.getScale(s) > 0)
|| f2->decimals() < row.getScale(s))
{
f2->dec = row.getScale(s);
}
f2->store(static_cast<double>(dl));
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
@ -2368,8 +2323,15 @@ int ha_calpont_impl_rnd_init(TABLE* table)
MIGR::infinidb_vtable.vtable_state = MIGR::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
#endif
// Set this to close all outstanding FEP connections on
// client disconnect in handlerton::closecon_handlerton().
if ( !thd_get_ha_data(thd, mcs_hton))
{
thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(0x42));
}
// prevent "create table as select" from running on slave
MIGR::infinidb_vtable.hasInfiniDBTable = true;
@ -2533,9 +2495,10 @@ int ha_calpont_impl_rnd_init(TABLE* table)
}
// vtable mode
else
// The whole section must be useless now.
{
//if (!ci->cal_conn_hndl || MIGR::infinidb_vtable.vtable_state == MIGR::INFINIDB_CREATE_VTABLE)
if ( MIGR::infinidb_vtable.vtable_state == MIGR::INFINIDB_CREATE_VTABLE)
if ( !ci->cal_conn_hndl ||
MIGR::infinidb_vtable.vtable_state == MIGR::INFINIDB_CREATE_VTABLE)
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
@ -2886,14 +2849,14 @@ int ha_calpont_impl_rnd_init(TABLE* table)
return 0;
error:
// CS doesn't need to close the actual sockets
// b/c it tries to reuse it running next query.
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
// do we need to close all connection handle of the table map?
return ER_INTERNAL_ERROR;
internal_error:
@ -3038,6 +3001,7 @@ int ha_calpont_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
MIGR::infinidb_vtable.isNewQuery = true;
// WIP MCOL-2178
// Workaround because CS doesn't reset isUnion in a normal way.
if (is_pushdown_hand)
{
@ -3046,15 +3010,13 @@ int ha_calpont_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
// WIP MCOL-2178. Won't see this state anymore.
if (MIGR::infinidb_vtable.vtable_state == MIGR::INFINIDB_ORDER_BY )
{
MIGR::infinidb_vtable.vtable_state = MIGR::INFINIDB_SELECT_VTABLE; // flip back to normal state
return rc;
}
// if (MIGR::infinidb_vtable.vtable_state == MIGR::INFINIDB_REDO_PHASE1)
// return rc;
if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
return rc;
@ -3167,6 +3129,8 @@ int ha_calpont_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
// reset expressionId just in case
ci->expressionId = 0;
thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(ci));
return rc;
}
@ -5272,12 +5236,10 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
* Execute the query and saves derived table query.
* There is an extra handler argument so I ended up with a
* new init function. The code is a copy of
* ha_calpont_impl_rnd_init() mostly. We should come up with
* a semi-universal structure that allows to save any
* extra data.
* ha_calpont_impl_rnd_init() mostly.
* PARAMETERS:
* void* handler either select_ or derived_handler
* TABLE* table - table where to save the results
* mcs_handler_info* pnt to an envelope struct
* TABLE* table - dest table to put the results into
* RETURN:
* rc as int
***********************************************************/
@ -5325,8 +5287,15 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
MIGR::infinidb_vtable.vtable_state = MIGR::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
#endif
// Set this to close all outstanding FEP connections on
// client disconnect in handlerton::closecon_handlerton().
if ( !thd_get_ha_data(thd, mcs_hton))
{
thd_set_ha_data(thd, mcs_hton, reinterpret_cast<void*>(0x42));
}
// prevent "create table as select" from running on slave
MIGR::infinidb_vtable.hasInfiniDBTable = true;