1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #574 from mariadb-corporation/develop

Develop
This commit is contained in:
david hill
2018-09-19 14:01:38 -05:00
committed by GitHub
164 changed files with 7330 additions and 3108 deletions

View File

@@ -27,10 +27,12 @@ add_library(calmysql SHARED ${libcalmysql_SRCS})
target_link_libraries(calmysql ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_INCLUDE_DIR}/../libservices/libmysqlservices.a threadpool)
SET_TARGET_PROPERTIES(calmysql PROPERTIES LINK_FLAGS "${calmysql_link_flags} -Wl,-E")
set_target_properties(calmysql PROPERTIES VERSION 1.0.0 SOVERSION 1)
SET ( is_columnstore_tables_SRCS
is_columnstore_tables.cpp
sm.cpp
)
add_library(is_columnstore_tables SHARED ${is_columnstore_tables_SRCS})
@@ -42,6 +44,7 @@ set_target_properties(is_columnstore_tables PROPERTIES VERSION 1.0.0 SOVERSION 1
SET ( is_columnstore_columns_SRCS
is_columnstore_columns.cpp
sm.cpp
)
add_library(is_columnstore_columns SHARED ${is_columnstore_columns_SRCS})
@@ -53,6 +56,7 @@ set_target_properties(is_columnstore_columns PROPERTIES VERSION 1.0.0 SOVERSION
SET ( is_columnstore_extents_SRCS
is_columnstore_extents.cpp
sm.cpp
)
add_library(is_columnstore_extents SHARED ${is_columnstore_extents_SRCS})
@@ -64,6 +68,7 @@ set_target_properties(is_columnstore_extents PROPERTIES VERSION 1.0.0 SOVERSION
SET ( is_columnstore_files_SRCS
is_columnstore_files.cpp
sm.cpp
)
add_library(is_columnstore_files SHARED ${is_columnstore_files_SRCS})

View File

@@ -37,43 +37,56 @@ DROP PROCEDURE IF EXISTS `table_usage` //
CREATE PROCEDURE table_usage (IN t_schema char(64), IN t_name char(64))
`table_usage`: BEGIN
DECLARE done INTEGER DEFAULT 0;
DECLARE dbname VARCHAR(64);
DECLARE tbname VARCHAR(64);
DECLARE object_ids TEXT;
DECLARE dictionary_object_ids TEXT;
DECLARE `locker` TINYINT UNSIGNED DEFAULT IS_USED_LOCK('table_usage');
DECLARE columns_list CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_name = t_name and table_schema = t_schema GROUP BY table_schema, table_name;
DECLARE columns_list_sc CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_schema = t_schema GROUP BY table_schema, table_name;
DECLARE columns_list_all CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS GROUP BY table_schema, table_name;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
IF `locker` IS NOT NULL THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Error acquiring table_usage lock';
LEAVE `table_usage`;
END IF;
DO GET_LOCK('table_usage', 0);
DROP TABLE IF EXISTS columnstore_info.columnstore_columns;
DROP TABLE IF EXISTS columnstore_info.columnstore_files;
CREATE TABLE columnstore_info.columnstore_columns engine=myisam as (select * from information_schema.columnstore_columns);
ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `object_id` (`object_id`);
ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `dictionary_object_id` (`dictionary_object_id`);
CREATE TABLE columnstore_info.columnstore_files engine=myisam as (select * from information_schema.columnstore_files);
ALTER TABLE columnstore_info.columnstore_files ADD INDEX `object_id` (`object_id`);
CREATE TEMPORARY TABLE columnstore_info.columnstore_files (TABLE_SCHEMA VARCHAR(64), TABLE_NAME VARCHAR(64), DATA BIGINT, DICT BIGINT);
IF t_name IS NOT NULL THEN
SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM (
SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict
FROM
columnstore_info.columnstore_columns ics where table_name = t_name and (table_schema = t_schema or t_schema IS NULL)
group by table_schema, table_name
) q;
OPEN columns_list;
ELSEIF t_schema IS NOT NULL THEN
SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM (
SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict
FROM
columnstore_info.columnstore_columns ics where table_schema = t_schema
group by table_schema, table_name
) q;
OPEN columns_list_sc;
ELSE
SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM (
SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict
FROM
columnstore_info.columnstore_columns ics
group by table_schema, table_name
) q;
OPEN columns_list_all;
END IF;
DROP TABLE IF EXISTS columnstore_info.columnstore_columns;
files_table: LOOP
IF t_name IS NOT NULL THEN
FETCH columns_list INTO dbname, tbname, object_ids, dictionary_object_ids;
ELSEIF t_schema IS NOT NULL THEN
FETCH columns_list_sc INTO dbname, tbname, object_ids, dictionary_object_ids;
ELSE
FETCH columns_list_all INTO dbname, tbname, object_ids, dictionary_object_ids;
END IF;
IF done = 1 THEN LEAVE files_table;
END IF;
INSERT INTO columnstore_info.columnstore_files (SELECT dbname, tbname, sum(file_size), 0 FROM information_schema.columnstore_files WHERE find_in_set(object_id, object_ids));
IF dictionary_object_ids IS NOT NULL THEN
UPDATE columnstore_info.columnstore_files SET DICT = (SELECT sum(file_size) FROM information_schema.columnstore_files WHERE find_in_set(object_id, dictionary_object_ids)) WHERE TABLE_SCHEMA = dbname AND TABLE_NAME = tbname;
END IF;
END LOOP;
IF t_name IS NOT NULL THEN
CLOSE columns_list;
ELSEIF t_schema IS NOT NULL THEN
CLOSE columns_list_sc;
ELSE
CLOSE columns_list_all;
END IF;
SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(DATA) as DATA_DISK_USAGE, columnstore_info.format_filesize(DICT) as DICT_DATA_USAGE, columnstore_info.format_filesize(DATA + COALESCE(DICT, 0)) as TOTAL_USAGE FROM columnstore_info.columnstore_files;
DROP TABLE IF EXISTS columnstore_info.columnstore_files;
DO RELEASE_LOCK('table_usage');
END //

View File

@@ -1156,7 +1156,11 @@ create_calpont_group_by_handler(THD* thd, Query* query)
{
ha_calpont_group_by_handler* handler = NULL;
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE )
// Create a handler if there is an agregate or a GROUP BY
// and if vtable was explicitly disabled.
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE
&& thd->variables.infinidb_vtable_mode == 0
&& ( query->group_by || thd->lex->select_lex.with_sum_func) )
{
handler = new ha_calpont_group_by_handler(thd, query);
@@ -1169,6 +1173,33 @@ create_calpont_group_by_handler(THD* thd, Query* query)
return handler;
}
/***********************************************************
* DESCRIPTION:
* GROUP BY handler constructor
* PARAMETERS:
* thd - THD pointer.
* query - Query describing structure
***********************************************************/
ha_calpont_group_by_handler::ha_calpont_group_by_handler(THD* thd_arg, Query* query)
: group_by_handler(thd_arg, calpont_hton),
select(query->select),
table_list(query->from),
distinct(query->distinct),
where(query->where),
group_by(query->group_by),
order_by(query->order_by),
having(query->having)
{
}
/***********************************************************
* DESCRIPTION:
* GROUP BY destructor
***********************************************************/
ha_calpont_group_by_handler::~ha_calpont_group_by_handler()
{
}
/***********************************************************
* DESCRIPTION:
* Makes the plan and prepares the data
@@ -1258,4 +1289,36 @@ mysql_declare_plugin(columnstore)
0 /* config flags */
}
mysql_declare_plugin_end;
maria_declare_plugin(columnstore)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&columnstore_storage_engine,
"Columnstore",
"MariaDB",
"Columnstore storage engine",
PLUGIN_LICENSE_GPL,
columnstore_init_func,
columnstore_done_func,
0x0100, /* 1.0 */
NULL, /* status variables */
calpont_system_variables, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
},
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&infinidb_storage_engine,
"InfiniDB",
"MariaDB",
"Columnstore storage engine (deprecated: use columnstore)",
PLUGIN_LICENSE_GPL,
infinidb_init_func,
infinidb_done_func,
0x0100, /* 1.0 */
NULL, /* status variables */
calpont_system_variables, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
maria_declare_plugin_end;

View File

@@ -255,12 +255,16 @@ public:
* One should read comments in server/sql/group_by_handler.h
* Attributes:
* select - attribute contains all GROUP BY, HAVING, ORDER items and calls it
* an extended SELECT list accordin to comments in
* server/sql/group_handler.cc.
* So the temporary table for
* select count(*) from b group by a having a > 3 order by a
* will have 4 columns not 1.
* However server ignores all NULLs used in GROUP BY, HAVING, ORDER.
* an extended SELECT list according to comments in
* server/sql/group_handler.cc.
* So the temporary table for
* select count(*) from b group by a having a > 3 order by a
* will have 4 columns not 1.
* However server ignores all NULLs used in
* GROUP BY, HAVING, ORDER.
* select_list_descr - contains Item description returned by Item->print()
* that is used in lookup for corresponding columns in
* extended SELECT list.
* table_list - contains all tables involved. Must be CS tables only.
* distinct - looks like a useless thing for now. Couldn't get it set by server.
* where - where items.
@@ -275,17 +279,8 @@ public:
class ha_calpont_group_by_handler: public group_by_handler
{
public:
ha_calpont_group_by_handler(THD* thd_arg, Query* query)
: group_by_handler(thd_arg, calpont_hton),
select(query->select),
table_list(query->from),
distinct(query->distinct),
where(query->where),
group_by(query->group_by),
order_by(query->order_by),
having(query->having)
{ }
~ha_calpont_group_by_handler() { }
ha_calpont_group_by_handler(THD* thd_arg, Query* query);
~ha_calpont_group_by_handler();
int init_scan();
int next_row();
int end_scan();

View File

@@ -1912,6 +1912,79 @@ pair<string, string> parseTableName(const string& tn)
}
//
// get_field_default_value: Returns the default value as a string value
// NOTE: This is duplicated code copied from show.cc and a MDEV-17006 has
// been created.
//
static bool get_field_default_value(THD *thd, Field *field, String *def_value,
bool quoted)
{
bool has_default;
enum enum_field_types field_type= field->type();
has_default= (field->default_value ||
(!(field->flags & NO_DEFAULT_VALUE_FLAG) &&
field->unireg_check != Field::NEXT_NUMBER));
def_value->length(0);
if (has_default)
{
StringBuffer<MAX_FIELD_WIDTH> str(field->charset());
if (field->default_value)
{
field->default_value->print(&str);
if (field->default_value->expr->need_parentheses_in_default())
{
def_value->set_charset(&my_charset_utf8mb4_general_ci);
def_value->append('(');
def_value->append(str);
def_value->append(')');
}
else
def_value->append(str);
}
else if (!field->is_null())
{ // Not null by default
if (field_type == MYSQL_TYPE_BIT)
{
str.qs_append('b');
str.qs_append('\'');
str.qs_append(field->val_int(), 2);
str.qs_append('\'');
quoted= 0;
}
else
{
field->val_str(&str);
if (!field->str_needs_quotes())
quoted= 0;
}
if (str.length())
{
StringBuffer<MAX_FIELD_WIDTH> def_val;
uint dummy_errors;
/* convert to system_charset_info == utf8 */
def_val.copy(str.ptr(), str.length(), field->charset(),
system_charset_info, &dummy_errors);
if (quoted)
append_unescaped(def_value, def_val.ptr(), def_val.length());
else
def_value->append(def_val);
}
else if (quoted)
def_value->set(STRING_WITH_LEN("''"), system_charset_info);
}
else if (field->maybe_null() && quoted)
def_value->set(STRING_WITH_LEN("NULL"), system_charset_info); // Null as default
else
return 0;
}
return has_default;
}
int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info, cal_connection_info& ci)
{
#ifdef INFINIDB_DEBUG
@@ -2045,7 +2118,7 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
}
// @bug 3908. error out primary key for now.
if (table_arg->key_info && table_arg->key_info->name && string(table_arg->key_info->name) == "PRIMARY")
if (table_arg->key_info && table_arg->key_info->name.length && string(table_arg->key_info->name.str) == "PRIMARY")
{
string emsg = logging::IDBErrorInfo::instance()->errorMsg(ERR_CONSTRAINTS);
setError(thd, ER_CHECK_NOT_IMPLEMENTED, emsg);
@@ -2096,6 +2169,97 @@ int ha_calpont_impl_create_(const char* name, TABLE* table_arg, HA_CREATE_INFO*
return 1;
}
//
// Check if this is a "CREATE TABLE ... LIKE " statement.
// If so generate a full create table statement using the properties of
// the source table. Note that source table has to be a columnstore table and
// we only check for currently supported options.
//
if (thd->lex->create_info.like())
{
TABLE_SHARE *share = table_arg->s;
my_bitmap_map *old_map; // To save the read_set
char datatype_buf[MAX_FIELD_WIDTH], def_value_buf[MAX_FIELD_WIDTH];
String datatype, def_value;
ostringstream oss;
string tbl_name (name+2);
std::replace(tbl_name.begin(), tbl_name.end(), '/', '.');
// Save the current read_set map and mark it for read
old_map= tmp_use_all_columns(table_arg, table_arg->read_set);
oss << "CREATE TABLE " << tbl_name << " (";
restore_record(table_arg, s->default_values);
for (Field **field= table_arg->field; *field; field++)
{
uint flags = (*field)->flags;
datatype.set(datatype_buf, sizeof(datatype_buf), system_charset_info);
(*field)->sql_type(datatype);
if (field != table_arg->field)
oss << ", ";
oss << (*field)->field_name.str << " " << datatype.ptr();
if (flags & NOT_NULL_FLAG)
oss << " NOT NULL";
def_value.set(def_value_buf, sizeof(def_value_buf), system_charset_info);
if (get_field_default_value(thd, *field, &def_value, true)) {
oss << " DEFAULT " << def_value.c_ptr();
}
if ((*field)->comment.length)
{
String comment;
append_unescaped(&comment, (*field)->comment.str, (*field)->comment.length);
oss << " COMMENT ";
oss << comment.c_ptr();
}
}
// End the list of columns
oss<< ") ENGINE=columnstore ";
// Process table level options
if (create_info->auto_increment_value > 1)
{
oss << " AUTO_INCREMENT=" << create_info->auto_increment_value;
}
if (share->table_charset)
{
oss << " DEFAULT CHARSET=" << share->table_charset->csname;
}
// Process table level options such as MIN_ROWS, MAX_ROWS, COMMENT
if (share->min_rows)
{
char buff[80];
longlong10_to_str(share->min_rows, buff, 10);
oss << " MIN_ROWS=" << buff;
}
if (share->max_rows) {
char buff[80];
longlong10_to_str(share->max_rows, buff, 10);
oss << " MAX_ROWS=" << buff;
}
if (share->comment.length) {
String comment;
append_unescaped(&comment, share->comment.str, share->comment.length);
oss << " COMMENT ";
oss << comment.c_ptr();
}
oss << ";";
stmt = oss.str();
tmp_restore_column_map(table_arg->read_set, old_map);
}
rc = ProcessDDLStatement(stmt, db, tbl, tid2sid(thd->thread_id), emsg, compressiontype, isAnyAutoincreCol, startValue, columnName);
if (rc != 0)
@@ -2214,8 +2378,8 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti
stmt = "alter table `" + fromPair.second + "` rename to `" + toPair.second + "`;";
string db;
if ( thd->db )
db = thd->db;
if ( thd->db.length )
db = thd->db.str;
else if ( fromPair.first.length() != 0 )
db = fromPair.first;
else
@@ -2224,7 +2388,7 @@ int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_connecti
int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg);
if (rc != 0)
push_warning(thd, Sql_condition::WARN_LEVEL_ERROR, 9999, emsg.c_str());
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str());
return rc;
}
@@ -2245,8 +2409,8 @@ extern "C"
THD* thd = current_thd;
string db("");
if ( thd->db )
db = thd->db;
if ( thd->db.length )
db = thd->db.str;
int compressiontype = thd->variables.infinidb_compression_type;
@@ -2266,7 +2430,7 @@ extern "C"
int rc = ProcessDDLStatement(stmt, db, "", tid2sid(thd->thread_id), emsg, compressiontype);
if (rc != 0)
push_warning(thd, Sql_condition::WARN_LEVEL_ERROR, 9999, emsg.c_str());
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, emsg.c_str());
return rc;
}

View File

@@ -121,7 +121,7 @@ int buildBuffer(uchar* buf, string& buffer, int& columns, TABLE* table)
columns++;
cols.append((*field)->field_name);
cols.append((*field)->field_name.str);
if (ptr == end_ptr)
{
@@ -236,7 +236,7 @@ uint32_t buildValueList (TABLE* table, cal_connection_info& ci )
}
}
ci.colNameList.push_back((*field)->field_name);
ci.colNameList.push_back((*field)->field_name.str);
columnPos++;
}
@@ -895,6 +895,11 @@ int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_
longlong tmp = my_time_packed_from_binary(pos, table->field[colpos]->decimals());
TIME_from_longlong_time_packed(&ltime, tmp);
if (ltime.neg)
{
fprintf(ci.filePtr, "-");
}
if (!ltime.second_part)
{
fprintf(ci.filePtr, "%02d:%02d:%02d%c",
@@ -1815,8 +1820,11 @@ int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_
}
else if (ci.columnTypes[colpos].colWidth < 16777216)
{
dataLength = *(uint32_t*) buf;
buf = buf + 3 ;
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
if (*(uint8_t*)buf)
dataLength += 256*256*(*(uint8_t*)buf) ;
buf++;
}
else
{

File diff suppressed because it is too large Load Diff

View File

@@ -61,7 +61,7 @@ using namespace std;
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/regex.hpp>
#include <boost/thread.hpp>
using namespace boost;
//using namespace boost;
#include "idb_mysql.h"
@@ -276,104 +276,6 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType&
}
}
void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
{
// unset null bit first
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
// For unsigned, use the ColType returned in the row rather than the
// unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
// Hopefully, in all other cases we get it right.
switch ((*f)->type())
{
case MYSQL_TYPE_NEWDECIMAL:
{
Field_new_decimal* f2 = (Field_new_decimal*)*f;
// @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
// to create vtable limitation.
if (f2->dec < ct.scale)
f2->dec = ct.scale;
char buf[256];
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
f2->store(buf, strlen(buf), f2->charset());
break;
}
case MYSQL_TYPE_TINY: //TINYINT type
{
Field_tiny* f2 = (Field_tiny*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_SHORT: //SMALLINT type
{
Field_short* f2 = (Field_short*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_LONG: //INT type
{
Field_long* f2 = (Field_long*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_LONGLONG: //BIGINT type
{
Field_longlong* f2 = (Field_longlong*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_FLOAT: // FLOAT type
{
Field_float* f2 = (Field_float*)*f;
float float_val = *(float*)(&value);
f2->store(float_val);
break;
}
case MYSQL_TYPE_DOUBLE: // DOUBLE type
{
Field_double* f2 = (Field_double*)*f;
double double_val = *(double*)(&value);
f2->store(double_val);
break;
}
case MYSQL_TYPE_VARCHAR:
{
Field_varstring* f2 = (Field_varstring*)*f;
char tmp[25];
if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
else
snprintf(tmp, 25, "%ld", value);
f2->store(tmp, strlen(tmp), f2->charset());
break;
}
default:
{
Field_longlong* f2 = (Field_longlong*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
}
}
//
// @bug 2244. Log exception related to lost connection to ExeMgr.
// Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
@@ -582,7 +484,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<8>(s);
DataConvert::datetimeToString(intColVal, tmp, 255);
DataConvert::datetimeToString(intColVal, tmp, 255, colType.precision);
/* setting the field_length is a sort-of hack. The length
* at this point can be long enough to include mseconds.
@@ -606,7 +508,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<8>(s);
DataConvert::timeToString(intColVal, tmp, 255);
DataConvert::timeToString(intColVal, tmp, 255, colType.precision);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
@@ -781,8 +683,11 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
//double double_val = *(double*)(&value);
//f2->store(double_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
if ((f2->decimals() == DECIMAL_NOT_SPECIFIED && row.getScale(s) > 0)
|| f2->decimals() < row.getScale(s))
{
f2->dec = row.getScale(s);
}
f2->store(dl);
@@ -974,7 +879,7 @@ uint32_t doUpdateDelete(THD* thd)
}
//@Bug 4387. Check BRM status before start statement.
scoped_ptr<DBRM> dbrmp(new DBRM());
boost::scoped_ptr<DBRM> dbrmp(new DBRM());
int rc = dbrmp->isReadWrite();
thd->infinidb_vtable.isInfiniDBDML = true;
@@ -1130,7 +1035,7 @@ uint32_t doUpdateDelete(THD* thd)
schemaName = string(item->db_name);
columnAssignmentPtr = new ColumnAssignment();
columnAssignmentPtr->fColumn = string(item->name);
columnAssignmentPtr->fColumn = string(item->name.str);
columnAssignmentPtr->fOperator = "=";
columnAssignmentPtr->fFuncScale = 0;
Item* value = value_it++;
@@ -1276,7 +1181,7 @@ uint32_t doUpdateDelete(THD* thd)
{
Item_field* tmp = (Item_field*)value;
if (!tmp->field_name) //null
if (!tmp->field_name.length) //null
{
columnAssignmentPtr->fScalarExpression = "NULL";
columnAssignmentPtr->fFromCol = false;
@@ -1397,9 +1302,9 @@ uint32_t doUpdateDelete(THD* thd)
if (deleteTable->get_num_of_tables() == 1)
{
schemaName = first_table->db;
tableName = first_table->table_name;
aliasName = first_table->alias;
schemaName = first_table->db.str;
tableName = first_table->table_name.str;
aliasName = first_table->alias.str;
qualifiedTablName->fName = tableName;
qualifiedTablName->fSchema = schemaName;
pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
@@ -1418,7 +1323,7 @@ uint32_t doUpdateDelete(THD* thd)
first_table = (TABLE_LIST*) thd->lex->select_lex.table_list.first;
schemaName = first_table->table->s->db.str;
tableName = first_table->table->s->table_name.str;
aliasName = first_table->alias;
aliasName = first_table->alias.str;
qualifiedTablName->fName = tableName;
qualifiedTablName->fSchema = schemaName;
pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
@@ -1429,7 +1334,7 @@ uint32_t doUpdateDelete(THD* thd)
first_table = (TABLE_LIST*) thd->lex->select_lex.table_list.first;
schemaName = first_table->table->s->db.str;
tableName = first_table->table->s->table_name.str;
aliasName = first_table->alias;
aliasName = first_table->alias.str;
qualifiedTablName->fName = tableName;
qualifiedTablName->fSchema = schemaName;
pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(dmlStatement);
@@ -1961,7 +1866,7 @@ uint32_t doUpdateDelete(THD* thd)
}
else
{
thd->set_row_count_func(dmlRowCount);
thd->set_row_count_func(dmlRowCount+thd->get_row_count_func());
}
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str());
@@ -1969,7 +1874,7 @@ uint32_t doUpdateDelete(THD* thd)
else
{
// if (dmlRowCount != 0) //Bug 5117. Handling self join.
thd->set_row_count_func(dmlRowCount);
thd->set_row_count_func(dmlRowCount+thd->get_row_count_func());
//cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl;
@@ -2240,7 +2145,7 @@ extern "C"
bool includeInput = true;
string pstr(parameter);
algorithm::to_lower(pstr);
boost::algorithm::to_lower(pstr);
if (pstr == PmSmallSideMaxMemory)
{
@@ -2386,8 +2291,8 @@ extern "C"
{
tableName.table = args->args[0];
if (thd->db)
tableName.schema = thd->db;
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
string msg("No schema information provided");
@@ -2524,8 +2429,8 @@ extern "C"
{
tableName.table = args->args[0];
if (thd->db)
tableName.schema = thd->db;
if (thd->db.length)
tableName.schema = thd->db.str;
else
{
return -1;
@@ -3019,8 +2924,8 @@ int ha_calpont_impl_rnd_init(TABLE* table)
ti.csep->verID(verID);
ti.csep->sessionID(sessionID);
if (thd->db)
ti.csep->schemaName(thd->db);
if (thd->db.length)
ti.csep->schemaName(thd->db.str);
ti.csep->traceFlags(ci->traceFlags);
ti.msTablePtr = table;
@@ -3113,8 +3018,8 @@ int ha_calpont_impl_rnd_init(TABLE* table)
csep->verID(verID);
csep->sessionID(sessionID);
if (thd->db)
csep->schemaName(thd->db);
if (thd->db.length)
csep->schemaName(thd->db.str);
csep->traceFlags(ci->traceFlags);
@@ -3779,12 +3684,12 @@ int ha_calpont_impl_delete_table(const char* name)
if (thd->lex->sql_command == SQLCOM_DROP_DB)
{
dbName = thd->lex->name.str;
dbName = const_cast<char*>(thd->lex->name.str);
}
else
{
TABLE_LIST* first_table = (TABLE_LIST*) thd->lex->select_lex.table_list.first;
dbName = first_table->db;
dbName = const_cast<char*>(first_table->db.str);
}
if (!dbName)
@@ -3806,7 +3711,7 @@ int ha_calpont_impl_delete_table(const char* name)
if (strcmp(dbName, "calpontsys") == 0 && string(name).find("@0024vtable") == string::npos)
{
std::string stmt(idb_mysql_query_str(thd));
algorithm::to_upper(stmt);
boost::algorithm::to_upper(stmt);
//@Bug 2432. systables can be dropped with restrict
if (stmt.find(" RESTRICT") != string::npos)
@@ -3958,7 +3863,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
if ((thd->lex)->sql_command == SQLCOM_INSERT)
{
string insertStmt = idb_mysql_query_str(thd);
algorithm::to_lower(insertStmt);
boost::algorithm::to_lower(insertStmt);
string intoStr("into");
size_t found = insertStmt.find(intoStr);
@@ -4106,7 +4011,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
#ifdef _MSC_VER
aCmdLine = aCmdLine + "/bin/cpimport.exe -N -P " + to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
#else
aCmdLine = aCmdLine + "/bin/cpimport -m 1 -N -P " + to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
aCmdLine = aCmdLine + "/bin/cpimport -m 1 -N -P " + boost::to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
#endif
}
}
@@ -4434,7 +4339,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
ci->stats.fQueryType = CalpontSelectExecutionPlan::queryTypeToString(CalpontSelectExecutionPlan::LOAD_DATA_INFILE);
//@Bug 4387. Check BRM status before start statement.
scoped_ptr<DBRM> dbrmp(new DBRM());
boost::scoped_ptr<DBRM> dbrmp(new DBRM());
int rc = dbrmp->isReadWrite();
if (rc != 0 )
@@ -4752,7 +4657,7 @@ int ha_calpont_impl_commit (handlerton* hton, THD* thd, bool all)
return 0;
//@Bug 5823 check if any active transaction for this session
scoped_ptr<DBRM> dbrmp(new DBRM());
boost::scoped_ptr<DBRM> dbrmp(new DBRM());
BRM::TxnID txnId = dbrmp->getTxnID(tid2sid(thd->thread_id));
if (!txnId.valid)
@@ -5042,6 +4947,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type)
{
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, infinidb_autoswitch_warning.c_str());
}
ci->queryState = 0;
}
else // vtable mode
{
@@ -5209,10 +5115,13 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
ci->warningMsg = msg;
}
// if the previous query has error, re-establish the connection
// If the previous query has error and
// this is not a subquery run by the server(MCOL-1601)
// re-establish the connection
if (ci->queryState != 0)
{
sm::sm_cleanup(ci->cal_conn_hndl);
if( ci->cal_conn_hndl_st.size() == 0 )
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@@ -5234,6 +5143,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
hndl = ci->cal_conn_hndl;
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
@@ -5254,8 +5164,8 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
csep->verID(verID);
csep->sessionID(sessionID);
if (group_hand->table_list->db_length)
csep->schemaName(group_hand->table_list->db);
if (group_hand->table_list->db.length)
csep->schemaName(group_hand->table_list->db.str);
csep->traceFlags(ci->traceFlags);
@@ -5275,7 +5185,6 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
execplan::ParseTree* ptIt;
execplan::ReturnedColumn* rcIt;
for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
{
@@ -5306,8 +5215,12 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
return 0;
string query;
query.assign(thd->infinidb_vtable.original_query.ptr(),
thd->infinidb_vtable.original_query.length());
// Set the query text only once if the server executes
// subqueries separately.
if(ci->queryState)
query.assign("<subquery of the previous>");
else
query.assign(thd->query_string.str(), thd->query_string.length());
csep->data(query);
try
@@ -5437,11 +5350,15 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
idbassert(hndl != 0);
hndl->csc = csc;
// The next section is useless
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
ti.conn_hndl = hndl;
else
{
ci->cal_conn_hndl = hndl;
ci->cal_conn_hndl_st.pop();
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
}
try
{
hndl->connect();
@@ -5474,11 +5391,11 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY))
{
if (ti.tpl_ctx == 0)
{
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
}
// MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts.
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_ctx_st.push(ti.tpl_ctx);
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx);
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
@@ -5558,6 +5475,7 @@ error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@@ -5569,6 +5487,7 @@ internal_error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@@ -5800,6 +5719,12 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
ci->cal_conn_hndl = 0;
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
if ( ci->cal_conn_hndl_st.size() )
{
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
}
return 0;
@@ -5809,6 +5734,7 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
cal_table_info ti = ci->tableMap[table];
sm::cpsm_conhdl_t* hndl;
bool clearScanCtx = false;
hndl = ci->cal_conn_hndl;
@@ -5816,6 +5742,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
{
if (ti.tpl_scan_ctx.get())
{
clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) &&
ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() );
try
{
sm::tpl_scan_close(ti.tpl_scan_ctx);
@@ -5827,10 +5755,31 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
}
ti.tpl_scan_ctx.reset();
if ( ti.tpl_scan_ctx_st.size() )
{
ti.tpl_scan_ctx_st.pop();
if ( ti.tpl_scan_ctx_st.size() )
ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top();
}
try
{
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats);
if(hndl)
{
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, clearScanCtx);
// Normaly stats variables are set in external_lock method but we set it here
// since they we pretend we are in vtable_disabled mode and the stats vars won't be set.
// We sum the stats up here since server could run a number of
// queries e.g. each for a subquery in a filter.
if(hndl)
{
if (hndl->queryStats.length())
ci->queryStats += hndl->queryStats;
if (hndl->extendedStats.length())
ci->extendedStats += hndl->extendedStats;
if (hndl->miniStats.length())
ci->miniStats += hndl->miniStats;
}
}
ci->cal_conn_hndl = hndl;
@@ -5863,6 +5812,20 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
ti.tpl_ctx = 0;
if ( ti.tpl_ctx_st.size() )
{
ti.tpl_ctx_st.pop();
if ( ti.tpl_ctx_st.size() )
ti.tpl_ctx = ti.tpl_ctx_st.top();
}
if ( ci->cal_conn_hndl_st.size() )
{
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
ci->tableMap[table] = ti;
// push warnings from CREATE phase

View File

@@ -99,7 +99,7 @@ struct gp_walk_info
execplan::CalpontSelectExecutionPlan::ReturnedColumnList groupByCols;
execplan::CalpontSelectExecutionPlan::ReturnedColumnList subGroupByCols;
execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols;
std::vector <Item*> havingAggColsItems;
std::vector <Item*> extSelAggColsItems;
execplan::CalpontSelectExecutionPlan::ColumnMap columnMap;
// This vector temporarily hold the projection columns to be added
// to the returnedCols vector for subquery processing. It will be appended
@@ -148,6 +148,9 @@ struct gp_walk_info
int32_t recursionHWM;
std::stack<int32_t> rcBookMarkStack;
// Kludge for MCOL-1472
bool inCaseStmt;
gp_walk_info() : sessionid(0),
fatalParseError(false),
condPush(false),
@@ -163,7 +166,8 @@ struct gp_walk_info
lastSub(0),
derivedTbCnt(0),
recursionLevel(-1),
recursionHWM(0)
recursionHWM(0),
inCaseStmt(false)
{}
~gp_walk_info() {}
@@ -183,7 +187,9 @@ struct cal_table_info
{ }
~cal_table_info() {}
sm::cpsm_tplh_t* tpl_ctx;
std::stack<sm::cpsm_tplh_t*> tpl_ctx_st;
sm::sp_cpsm_tplsch_t tpl_scan_ctx;
std::stack<sm::sp_cpsm_tplsch_t> tpl_scan_ctx_st;
unsigned c; // for debug purpose
TABLE* msTablePtr; // no ownership
sm::cpsm_conhdl_t* conn_hndl;
@@ -269,6 +275,7 @@ struct cal_connection_info
}
sm::cpsm_conhdl_t* cal_conn_hndl;
std::stack<sm::cpsm_conhdl_t*> cal_conn_hndl_st;
int queryState;
CalTableMap tableMap;
sm::tableid_t currentTable;
@@ -326,15 +333,14 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& cse
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
void setError(THD* thd, uint32_t errcode, const std::string errmsg);
void gp_walk(const Item* item, void* arg);
void parse_item (Item* item, std::vector<Item_field*>& field_vec, bool& hasNonSupportItem, uint16& parseInfo);
execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport);
void parse_item (Item* item, std::vector<Item_field*>& field_vec, bool& hasNonSupportItem, uint16& parseInfo, gp_walk_info* gwip = NULL);
const std::string bestTableName(const Item_field* ifp);
bool isInfiniDB(TABLE* table_ptr);
// execution plan util functions prototypes
execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport);
execplan::ReturnedColumn* buildFunctionColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport);
execplan::ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport);
execplan::ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand = false);
execplan::ReturnedColumn* buildFunctionColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand = false);
execplan::ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand = false);
execplan::ConstantColumn* buildDecimalColumn(Item* item, gp_walk_info& gwi);
execplan::SimpleColumn* buildSimpleColumn(Item_field* item, gp_walk_info& gwi);
execplan::FunctionColumn* buildCaseFunction(Item_func* item, gp_walk_info& gwi, bool& nonSupport);
@@ -346,7 +352,7 @@ void addIntervalArgs(Item_func* ifp, funcexp::FunctionParm& functionParms);
void castCharArgs(Item_func* ifp, funcexp::FunctionParm& functionParms);
void castDecimalArgs(Item_func* ifp, funcexp::FunctionParm& functionParms);
void castTypeArgs(Item_func* ifp, funcexp::FunctionParm& functionParms);
void parse_item (Item* item, std::vector<Item_field*>& field_vec, bool& hasNonSupportItem, uint16& parseInfo);
//void parse_item (Item* item, std::vector<Item_field*>& field_vec, bool& hasNonSupportItem, uint16& parseInfo);
bool isPredicateFunction(Item* item, gp_walk_info* gwip);
execplan::ParseTree* buildRowPredicate(execplan::RowColumn* lhs, execplan::RowColumn* rhs, std::string predicateOp);
bool buildRowColumnFilter(gp_walk_info* gwip, execplan::RowColumn* rhs, execplan::RowColumn* lhs, Item_func* ifp);

View File

@@ -642,9 +642,9 @@ void partitionByValue_common(UDF_ARGS* args, // input
}
else
{
if (current_thd->db)
if (current_thd->db.length)
{
schema = current_thd->db;
schema = current_thd->db.str;
}
else
{
@@ -1019,9 +1019,9 @@ extern "C"
}
else
{
if (current_thd->db)
if (current_thd->db.length)
{
schema = current_thd->db;
schema = current_thd->db.str;
}
else
{
@@ -1228,7 +1228,7 @@ extern "C"
{
tableName.table = args->args[0];
if (!current_thd->db)
if (!current_thd->db.length)
{
errMsg = "No schema name indicated.";
memcpy(result, errMsg.c_str(), errMsg.length());
@@ -1236,7 +1236,7 @@ extern "C"
return result;
}
tableName.schema = current_thd->db;
tableName.schema = current_thd->db.str;
parsePartitionString(args, 1, partitionNums, errMsg, tableName);
}
@@ -1316,14 +1316,14 @@ extern "C"
{
tableName.table = args->args[0];
if (!current_thd->db)
if (!current_thd->db.length)
{
current_thd->get_stmt_da()->set_overwrite_status(true);
current_thd->raise_error_printf(ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_PARTITION_NO_SCHEMA).c_str());
return result;
}
tableName.schema = current_thd->db;
tableName.schema = current_thd->db.str;
parsePartitionString(args, 1, partitionNums, errMsg, tableName);
}
@@ -1403,14 +1403,14 @@ extern "C"
{
tableName.table = args->args[0];
if (!current_thd->db)
if (!current_thd->db.length)
{
current_thd->get_stmt_da()->set_overwrite_status(true);
current_thd->raise_error_printf(ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_PARTITION_NO_SCHEMA).c_str());
return result;
}
tableName.schema = current_thd->db;
tableName.schema = current_thd->db.str;
parsePartitionString(args, 1, partSet, errMsg, tableName);
}
@@ -1724,9 +1724,9 @@ extern "C"
}
else
{
if (current_thd->db)
if (current_thd->db.length)
{
schema = current_thd->db;
schema = current_thd->db.str;
}
else
{

View File

@@ -582,7 +582,7 @@ execplan::ReturnedColumn* buildPseudoColumn(Item* item,
PseudoColumn* pc = new PseudoColumn(*sc, pseudoType);
// @bug5892. set alias for derived table column matching.
pc->alias(ifp->name ? ifp->name : "");
pc->alias(ifp->name.length ? ifp->name.str : "");
return pc;
}

View File

@@ -84,7 +84,7 @@ void View::transform()
for (; table_ptr; table_ptr = table_ptr->next_local)
{
// mysql put vtable here for from sub. we ignore it
if (string(table_ptr->table_name).find("$vtable") != string::npos)
if (string(table_ptr->table_name.str).find("$vtable") != string::npos)
continue;
string viewName = getViewName(table_ptr);
@@ -93,8 +93,8 @@ void View::transform()
{
SELECT_LEX* select_cursor = table_ptr->derived->first_select();
FromSubQuery* fromSub = new FromSubQuery(gwi, select_cursor);
string alias(table_ptr->alias);
gwi.viewName = make_aliasview("", alias, table_ptr->belong_to_view->alias, "");
string alias(table_ptr->alias.str);
gwi.viewName = make_aliasview("", alias, table_ptr->belong_to_view->alias.str, "");
algorithm::to_lower(alias);
fromSub->alias(alias);
gwi.derivedTbList.push_back(SCSEP(fromSub->transform()));
@@ -107,8 +107,8 @@ void View::transform()
else if (table_ptr->view)
{
// for nested view, the view name is vout.vin... format
CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db, table_ptr->table_name, table_ptr->alias, viewName);
gwi.viewName = make_aliastable(table_ptr->db, table_ptr->table_name, viewName);
CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db.str, table_ptr->table_name.str, table_ptr->alias.str, viewName);
gwi.viewName = make_aliastable(table_ptr->db.str, table_ptr->table_name.str, viewName);
View* view = new View(table_ptr->view->select_lex, &gwi);
view->viewName(gwi.viewName);
gwi.viewList.push_back(view);
@@ -121,9 +121,9 @@ void View::transform()
// trigger system catalog cache
if (infiniDB)
csc->columnRIDs(make_table(table_ptr->db, table_ptr->table_name), true);
csc->columnRIDs(make_table(table_ptr->db.str, table_ptr->table_name.str), true);
CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db, table_ptr->table_name, table_ptr->alias, viewName, infiniDB);
CalpontSystemCatalog::TableAliasName tn = make_aliasview(table_ptr->db.str, table_ptr->table_name.str, table_ptr->alias.str, viewName, infiniDB);
gwi.tbList.push_back(tn);
gwi.tableMap[tn] = make_pair(0, table_ptr);
fParentGwip->tableMap[tn] = make_pair(0, table_ptr);

View File

@@ -203,7 +203,7 @@ string ConvertFuncName(Item_sum* item)
switch (item->sum_func())
{
case Item_sum::COUNT_FUNC:
if (!item->arguments()[0]->name)
if (!item->arguments()[0]->name.str)
return "COUNT(*)";
return "COUNT";
@@ -289,6 +289,13 @@ string ConvertFuncName(Item_sum* item)
return "PERCENT_RANK";
break;
case Item_sum::PERCENTILE_CONT_FUNC:
return "PERCENTILE_CONT";
break;
case Item_sum::PERCENTILE_DISC_FUNC:
return "PERCENTILE_DISC";
case Item_sum::CUME_DIST_FUNC:
return "CUME_DIST";
break;
@@ -340,6 +347,7 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
ac->distinct(item_sum->has_with_distinct());
Window_spec* win_spec = wf->window_spec;
SRCP srcp;
CalpontSystemCatalog::ColType ct; // For return type
// arguments
vector<SRCP> funcParms;
@@ -370,18 +378,25 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
context.setColWidth(rt.colWidth);
context.setScale(rt.scale);
context.setPrecision(rt.precision);
context.setParamCount(funcParms.size());
mcsv1sdk::ColumnDatum colType;
mcsv1sdk::ColumnDatum colTypes[funcParms.size()];
// Turn on the Analytic flag so the function is aware it is being called
// as a Window Function.
context.setContextFlag(CONTEXT_IS_ANALYTIC);
COL_TYPES colTypes;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator cmIter;
// Build the column type vector.
// Modified for MCOL-1201 multi-argument aggregate
for (size_t i = 0; i < funcParms.size(); ++i)
{
colTypes.push_back(make_pair(funcParms[i]->alias(), funcParms[i]->resultType().colDataType));
const execplan::CalpontSystemCatalog::ColType& resultType
= funcParms[i]->resultType();
colType.dataType = resultType.colDataType;
colType.precision = resultType.precision;
colType.scale = resultType.scale;
colTypes[i] = colType;
}
// Call the user supplied init()
@@ -401,7 +416,6 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
}
// Set the return type as set in init()
CalpontSystemCatalog::ColType ct;
ct.colDataType = context.getResultType();
ct.colWidth = context.getColWidth();
ct.scale = context.getScale();
@@ -419,10 +433,10 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
{
case Item_sum::UDF_SUM_FUNC:
{
uint64_t bIgnoreNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS));
char sIgnoreNulls[18];
sprintf(sIgnoreNulls, "%lu", bIgnoreNulls);
srcp.reset(new ConstantColumn(sIgnoreNulls, (uint64_t)bIgnoreNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT
uint64_t bRespectNulls = (ac->getUDAFContext().getRunFlag(mcsv1sdk::UDAF_IGNORE_NULLS)) ? 0 : 1;
char sRespectNulls[18];
sprintf(sRespectNulls, "%lu", bRespectNulls);
srcp.reset(new ConstantColumn(sRespectNulls, (uint64_t)bRespectNulls, ConstantColumn::NUM)); // IGNORE/RESPECT NULLS. 1 => RESPECT
funcParms.push_back(srcp);
break;
}
@@ -881,11 +895,13 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
return NULL;
}
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
ac->adjustResultType();
if (item_sum->sum_func() != Item_sum::UDF_SUM_FUNC)
{
ac->resultType(colType_MysqlToIDB(item_sum));
// bug5736. Make the result type double for some window functions when
// infinidb_double_for_decimal_math is set.
ac->adjustResultType();
}
ac->expressionId(ci->expressionId++);

View File

@@ -63,6 +63,7 @@ template <class T> bool isnan(T);
#endif
#endif
#include "sql_plugin.h"
#include "sql_table.h"
#include "sql_select.h"
#include "mysqld_error.h"

View File

@@ -84,6 +84,7 @@ CREATE FUNCTION idbpartition RETURNS STRING soname 'libcalmysql.so';
CREATE FUNCTION idblocalpm RETURNS INTEGER soname 'libcalmysql.so';
CREATE FUNCTION mcssystemready RETURNS INTEGER soname 'libcalmysql.so';
CREATE FUNCTION mcssystemreadonly RETURNS INTEGER soname 'libcalmysql.so';
CREATE AGGREGATE FUNCTION regr_avgx RETURNS REAL soname 'libudf_mysql.so';
CREATE DATABASE IF NOT EXISTS infinidb_vtable;
CREATE DATABASE IF NOT EXISTS infinidb_querystats;

View File

@@ -56,10 +56,62 @@ ST_FIELD_INFO is_columnstore_columns_fields[] =
};
static void get_cond_item(Item_func* item, String** table, String** db)
{
char tmp_char[MAX_FIELD_WIDTH];
Item_field* item_field = (Item_field*) item->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "table_name") == 0)
{
String str_buf(tmp_char, sizeof(tmp_char), system_charset_info);
*table = item->arguments()[1]->val_str(&str_buf);
return;
}
else if (strcasecmp(item_field->field_name.str, "table_schema") == 0)
{
String str_buf(tmp_char, sizeof(tmp_char), system_charset_info);
*db = item->arguments()[1]->val_str(&str_buf);
return;
}
}
static void get_cond_items(COND* cond, String** table, String** db)
{
if (cond->type() == Item::FUNC_ITEM)
{
Item_func* fitem = (Item_func*) cond;
if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[1]->const_item())
{
get_cond_item(fitem, table, db);
}
}
else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC))
{
List_iterator<Item> li(*((Item_cond*) cond)->argument_list());
Item* item;
while ((item = li++))
{
if (item->type() == Item::FUNC_ITEM)
{
get_cond_item((Item_func*)item, table, db);
}
else
{
get_cond_items(item, table, db);
}
}
}
}
static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond)
{
CHARSET_INFO* cs = system_charset_info;
TABLE* table = tables->table;
String* table_name = NULL;
String* db_name = NULL;
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id));
@@ -69,9 +121,30 @@ static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond)
systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE);
if (cond)
{
get_cond_items(cond, &table_name, &db_name);
}
for (std::vector<std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> >::const_iterator it = catalog_tables.begin();
it != catalog_tables.end(); ++it)
{
if (db_name)
{
if ((*it).second.schema.compare(db_name->ptr()) != 0)
{
continue;
}
}
if (table_name)
{
if ((*it).second.table.compare(table_name->ptr()) != 0)
{
continue;
}
}
execplan::CalpontSystemCatalog::RIDList column_rid_list;
// Note a table may get dropped as you iterate over the list of tables.
@@ -184,8 +257,6 @@ static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond)
}
}
return 0;
}

View File

@@ -52,14 +52,142 @@ ST_FIELD_INFO is_columnstore_extents_fields[] =
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
};
static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond)
static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* thd)
{
CHARSET_INFO* cs = system_charset_info;
TABLE* table = tables->table;
std::vector<struct BRM::EMEntry> entries;
std::vector<struct BRM::EMEntry>::iterator iter;
std::vector<struct BRM::EMEntry>::iterator end;
emp->getExtents(oid, entries, false, false, true);
if (entries.size() == 0)
return 0;
iter = entries.begin();
end = entries.end();
while (iter != end)
{
table->field[0]->store(oid);
if (iter->colWid > 0)
{
table->field[1]->store("Column", strlen("Column"), cs);
if (iter->partition.cprange.lo_val == std::numeric_limits<int64_t>::max() ||
iter->partition.cprange.lo_val <= (std::numeric_limits<int64_t>::min() + 2))
{
table->field[4]->set_null();
}
else
{
table->field[4]->set_notnull();
table->field[4]->store(iter->partition.cprange.lo_val);
}
if (iter->partition.cprange.hi_val == std::numeric_limits<int64_t>::max() ||
iter->partition.cprange.hi_val <= (std::numeric_limits<int64_t>::min() + 2))
{
table->field[5]->set_null();
}
else
{
table->field[5]->set_notnull();
table->field[5]->store(iter->partition.cprange.hi_val);
}
table->field[6]->store(iter->colWid);
}
else
{
table->field[1]->store("Dictionary", strlen("Dictionary"), cs);
table->field[4]->set_null();
table->field[5]->set_null();
table->field[6]->store(8192);
}
table->field[2]->store(iter->range.start);
table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1);
table->field[7]->store(iter->dbRoot);
table->field[8]->store(iter->partitionNum);
table->field[9]->store(iter->segmentNum);
table->field[10]->store(iter->blockOffset);
table->field[11]->store(iter->range.size * 1024);
table->field[12]->store(iter->HWM);
switch (iter->partition.cprange.isValid)
{
case 0:
table->field[13]->store("Invalid", strlen("Invalid"), cs);
break;
case 1:
table->field[13]->store("Updating", strlen("Updating"), cs);
break;
case 2:
table->field[13]->store("Valid", strlen("Valid"), cs);
break;
default:
table->field[13]->store("Unknown", strlen("Unknown"), cs);
break;
}
switch (iter->status)
{
case BRM::EXTENTAVAILABLE:
table->field[14]->store("Available", strlen("Available"), cs);
break;
case BRM::EXTENTUNAVAILABLE:
table->field[14]->store("Unavailable", strlen("Unavailable"), cs);
break;
case BRM::EXTENTOUTOFSERVICE:
table->field[14]->store("Out of service", strlen("Out of service"), cs);
break;
default:
table->field[14]->store("Unknown", strlen("Unknown"), cs);
}
// MCOL-1016: on multiple segments HWM is set to 0 on the lower
// segments, we don't want these to show as 8KB. The down side is
// if the column has less than 1 block it will show as 0 bytes.
// We have no lookahead without it getting messy so this is the
// best compromise.
if (iter->HWM == 0)
{
table->field[15]->store(0);
}
else
{
table->field[15]->store((iter->HWM + 1) * 8192);
}
if (schema_table_store_record(thd, table))
{
delete emp;
return 1;
}
iter++;
}
return 0;
}
static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond)
{
BRM::OID_t cond_oid = 0;
TABLE* table = tables->table;
BRM::DBRM* emp = new BRM::DBRM();
if (!emp || !emp->isDBRMReady())
@@ -67,130 +195,83 @@ static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond)
return 1;
}
if (cond && cond->type() == Item::FUNC_ITEM)
{
Item_func* fitem = (Item_func*) cond;
if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2))
{
if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[1]->const_item())
{
// WHERE object_id = value
Item_field* item_field = (Item_field*) fitem->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
cond_oid = fitem->arguments()[1]->val_int();
return generate_result(cond_oid, emp, table, thd);
}
}
else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[0]->const_item())
{
// WHERE value = object_id
Item_field* item_field = (Item_field*) fitem->arguments()[1]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
cond_oid = fitem->arguments()[0]->val_int();
return generate_result(cond_oid, emp, table, thd);
}
}
}
else if (fitem->functype() == Item_func::IN_FUNC)
{
// WHERE object_id in (value1, value2)
Item_field* item_field = (Item_field*) fitem->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
for (unsigned int i = 1; i < fitem->argument_count(); i++)
{
cond_oid = fitem->arguments()[i]->val_int();
int result = generate_result(cond_oid, emp, table, thd);
if (result)
return 1;
}
}
}
else if (fitem->functype() == Item_func::UNKNOWN_FUNC &&
strcasecmp(fitem->func_name(), "find_in_set") == 0)
{
// WHERE FIND_IN_SET(object_id, values)
String* tmp_var = fitem->arguments()[1]->val_str();
std::stringstream ss(tmp_var->ptr());
while (ss >> cond_oid)
{
int ret = generate_result(cond_oid, emp, table, thd);
if (ret)
return 1;
if (ss.peek() == ',')
ss.ignore();
}
}
}
execplan::ObjectIDManager oidm;
BRM::OID_t MaxOID = oidm.size();
for (BRM::OID_t oid = 3000; oid <= MaxOID; oid++)
{
emp->getExtents(oid, entries, false, false, true);
int result = generate_result(oid, emp, table, thd);
if (entries.size() == 0)
continue;
iter = entries.begin();
end = entries.end();
while (iter != end)
{
table->field[0]->store(oid);
if (iter->colWid > 0)
{
table->field[1]->store("Column", strlen("Column"), cs);
if (iter->partition.cprange.lo_val == std::numeric_limits<int64_t>::max() ||
iter->partition.cprange.lo_val <= (std::numeric_limits<int64_t>::min() + 2))
{
table->field[4]->set_null();
}
else
{
table->field[4]->set_notnull();
table->field[4]->store(iter->partition.cprange.lo_val);
}
if (iter->partition.cprange.hi_val == std::numeric_limits<int64_t>::max() ||
iter->partition.cprange.hi_val <= (std::numeric_limits<int64_t>::min() + 2))
{
table->field[5]->set_null();
}
else
{
table->field[5]->set_notnull();
table->field[5]->store(iter->partition.cprange.hi_val);
}
table->field[6]->store(iter->colWid);
}
else
{
table->field[1]->store("Dictionary", strlen("Dictionary"), cs);
table->field[4]->set_null();
table->field[5]->set_null();
table->field[6]->store(8192);
}
table->field[2]->store(iter->range.start);
table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1);
table->field[7]->store(iter->dbRoot);
table->field[8]->store(iter->partitionNum);
table->field[9]->store(iter->segmentNum);
table->field[10]->store(iter->blockOffset);
table->field[11]->store(iter->range.size * 1024);
table->field[12]->store(iter->HWM);
switch (iter->partition.cprange.isValid)
{
case 0:
table->field[13]->store("Invalid", strlen("Invalid"), cs);
break;
case 1:
table->field[13]->store("Updating", strlen("Updating"), cs);
break;
case 2:
table->field[13]->store("Valid", strlen("Valid"), cs);
break;
default:
table->field[13]->store("Unknown", strlen("Unknown"), cs);
break;
}
switch (iter->status)
{
case BRM::EXTENTAVAILABLE:
table->field[14]->store("Available", strlen("Available"), cs);
break;
case BRM::EXTENTUNAVAILABLE:
table->field[14]->store("Unavailable", strlen("Unavailable"), cs);
break;
case BRM::EXTENTOUTOFSERVICE:
table->field[14]->store("Out of service", strlen("Out of service"), cs);
break;
default:
table->field[14]->store("Unknown", strlen("Unknown"), cs);
}
// MCOL-1016: on multiple segments HWM is set to 0 on the lower
// segments, we don't want these to show as 8KB. The down side is
// if the column has less than 1 block it will show as 0 bytes.
// We have no lookahead without it getting messy so this is the
// best compromise.
if (iter->HWM == 0)
{
table->field[15]->store(0);
}
else
{
table->field[15]->store((iter->HWM + 1) * 8192);
}
if (schema_table_store_record(thd, table))
{
delete emp;
return 1;
}
iter++;
}
if (result)
return 1;
}
delete emp;

View File

@@ -84,12 +84,10 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient* msgQueueClient, cons
}
}
static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond)
static int generate_result(BRM::OID_t oid, BRM::DBRM* emp, TABLE* table, THD* thd)
{
BRM::DBRM* emp = new BRM::DBRM();
std::vector<struct BRM::EMEntry> entries;
CHARSET_INFO* cs = system_charset_info;
TABLE* table = tables->table;
char oidDirName[WriteEngine::FILE_NAME_SIZE];
char fullFileName[WriteEngine::FILE_NAME_SIZE];
@@ -103,99 +101,184 @@ static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond)
oam::Oam oam_instance;
int pmId = 0;
emp->getExtents(oid, entries, false, false, true);
if (entries.size() == 0)
return 0;
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
while ( iter != entries.end() ) //organize extents into files
{
// Don't include files more than once at different block offsets
if (iter->blockOffset > 0)
{
iter++;
return 0;
}
try
{
oam_instance.getDbrootPmConfig(iter->dbRoot, pmId);
}
catch (std::runtime_error)
{
// MCOL-1116: If we are here a DBRoot is offline/missing
iter++;
return 0;
}
table->field[0]->store(oid);
table->field[1]->store(iter->segmentNum);
table->field[2]->store(iter->partitionNum);
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
std::stringstream DbRootName;
DbRootName << "DBRoot" << iter->dbRoot;
std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str());
fileSize = compressedFileSize = 0;
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName);
std::ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer";
std::string client = oss.str();
msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str());
if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;
return 1;
}
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
if (fileSize > 0)
{
table->field[4]->set_notnull();
table->field[4]->store(fileSize);
if (compressedFileSize > 0)
{
table->field[5]->set_notnull();
table->field[5]->store(compressedFileSize);
}
else
{
table->field[5]->set_null();
}
}
else
{
table->field[4]->set_null();
table->field[5]->set_null();
}
if (schema_table_store_record(thd, table))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;
return 1;
}
iter++;
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
msgQueueClient = NULL;
}
return 0;
}
static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond)
{
BRM::DBRM* emp = new BRM::DBRM();
BRM::OID_t cond_oid = 0;
TABLE* table = tables->table;
if (!emp || !emp->isDBRMReady())
{
return 1;
}
if (cond && cond->type() == Item::FUNC_ITEM)
{
Item_func* fitem = (Item_func*) cond;
if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2))
{
if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[1]->const_item())
{
// WHERE object_id = value
Item_field* item_field = (Item_field*) fitem->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
cond_oid = fitem->arguments()[1]->val_int();
return generate_result(cond_oid, emp, table, thd);
}
}
else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[0]->const_item())
{
// WHERE value = object_id
Item_field* item_field = (Item_field*) fitem->arguments()[1]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
cond_oid = fitem->arguments()[0]->val_int();
return generate_result(cond_oid, emp, table, thd);
}
}
}
else if (fitem->functype() == Item_func::IN_FUNC)
{
// WHERE object_id in (value1, value2)
Item_field* item_field = (Item_field*) fitem->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "object_id") == 0)
{
for (unsigned int i = 1; i < fitem->argument_count(); i++)
{
cond_oid = fitem->arguments()[i]->val_int();
int result = generate_result(cond_oid, emp, table, thd);
if (result)
return 1;
}
}
}
else if (fitem->functype() == Item_func::UNKNOWN_FUNC &&
strcasecmp(fitem->func_name(), "find_in_set") == 0)
{
// WHERE FIND_IN_SET(object_id, values)
String* tmp_var = fitem->arguments()[1]->val_str();
std::stringstream ss(tmp_var->ptr());
while (ss >> cond_oid)
{
int ret = generate_result(cond_oid, emp, table, thd);
if (ret)
return 1;
if (ss.peek() == ',')
ss.ignore();
}
}
}
execplan::ObjectIDManager oidm;
BRM::OID_t MaxOID = oidm.size();
for (BRM::OID_t oid = 3000; oid <= MaxOID; oid++)
if (!cond_oid)
{
emp->getExtents(oid, entries, false, false, true);
if (entries.size() == 0)
continue;
std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
while ( iter != entries.end() ) //organize extents into files
for (BRM::OID_t oid = 3000; oid <= MaxOID; oid++)
{
// Don't include files more than once at different block offsets
if (iter->blockOffset > 0)
{
iter++;
continue;
}
int result = generate_result(oid, emp, table, thd);
try
{
oam_instance.getDbrootPmConfig(iter->dbRoot, pmId);
}
catch (std::runtime_error)
{
// MCOL-1116: If we are here a DBRoot is offline/missing
iter++;
continue;
}
table->field[0]->store(oid);
table->field[1]->store(iter->segmentNum);
table->field[2]->store(iter->partitionNum);
WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum);
std::stringstream DbRootName;
DbRootName << "DBRoot" << iter->dbRoot;
std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str());
fileSize = compressedFileSize = 0;
snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName);
std::ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer";
std::string client = oss.str();
msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str());
if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;
if (result)
return 1;
}
table->field[3]->store(fullFileName, strlen(fullFileName), cs);
if (fileSize > 0)
{
table->field[4]->set_notnull();
table->field[4]->store(fileSize);
if (compressedFileSize > 0)
{
table->field[5]->set_notnull();
table->field[5]->store(compressedFileSize);
}
else
{
table->field[5]->set_null();
}
}
else
{
table->field[4]->set_null();
table->field[5]->set_null();
}
if (schema_table_store_record(thd, table))
{
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
delete emp;
return 1;
}
iter++;
messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient);
msgQueueClient = NULL;
}
}

View File

@@ -42,22 +42,95 @@ ST_FIELD_INFO is_columnstore_tables_fields[] =
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
};
static void get_cond_item(Item_func* item, String** table, String** db)
{
char tmp_char[MAX_FIELD_WIDTH];
Item_field* item_field = (Item_field*) item->arguments()[0]->real_item();
if (strcasecmp(item_field->field_name.str, "table_name") == 0)
{
String str_buf(tmp_char, sizeof(tmp_char), system_charset_info);
*table = item->arguments()[1]->val_str(&str_buf);
return;
}
else if (strcasecmp(item_field->field_name.str, "table_schema") == 0)
{
String str_buf(tmp_char, sizeof(tmp_char), system_charset_info);
*db = item->arguments()[1]->val_str(&str_buf);
return;
}
}
static void get_cond_items(COND* cond, String** table, String** db)
{
if (cond->type() == Item::FUNC_ITEM)
{
Item_func* fitem = (Item_func*) cond;
if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM &&
fitem->arguments()[1]->const_item())
{
get_cond_item(fitem, table, db);
}
}
else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC))
{
List_iterator<Item> li(*((Item_cond*) cond)->argument_list());
Item* item;
while ((item = li++))
{
if (item->type() == Item::FUNC_ITEM)
{
get_cond_item((Item_func*)item, table, db);
}
else
{
get_cond_items(item, table, db);
}
}
}
}
static int is_columnstore_tables_fill(THD* thd, TABLE_LIST* tables, COND* cond)
{
CHARSET_INFO* cs = system_charset_info;
TABLE* table = tables->table;
String* table_name = NULL;
String* db_name = NULL;
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id));
systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE);
if (cond)
{
get_cond_items(cond, &table_name, &db_name);
}
const std::vector< std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> > catalog_tables
= systemCatalogPtr->getTables();
for (std::vector<std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> >::const_iterator it = catalog_tables.begin();
it != catalog_tables.end(); ++it)
{
if (db_name)
{
if ((*it).second.schema.compare(db_name->ptr()) != 0)
{
continue;
}
}
if (table_name)
{
if ((*it).second.table.compare(table_name->ptr()) != 0)
{
continue;
}
}
execplan::CalpontSystemCatalog::TableInfo tb_info = systemCatalogPtr->tableInfo((*it).second);
std::string create_date = dataconvert::DataConvert::dateToString((*it).second.create_date);
table->field[0]->store((*it).second.schema.c_str(), (*it).second.schema.length(), cs);

View File

@@ -20,6 +20,7 @@
*
***********************************************************************/
#include <my_global.h>
#include <mysql.h>
#include <my_sys.h>
#include <errmsg.h>
@@ -279,7 +280,7 @@ tpl_open ( tableid_t tableid,
cpsm_tplh_t* ntplh,
cpsm_conhdl_t* conn_hdl)
{
SMDEBUGLOG << "tpl_open: " << conn_hdl << " tableid: " << tableid << endl;
SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl;
// if first time enter this function for a statement, set
// queryState to QUERY_IN_PRCOESS and get execution plan.
@@ -318,7 +319,9 @@ tpl_scan_open ( tableid_t tableid,
sp_cpsm_tplsch_t& ntplsch,
cpsm_conhdl_t* conn_hdl )
{
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl;
#endif
// @bug 649. No initialization here. take passed in reference
ntplsch->tableid = tableid;
@@ -353,8 +356,8 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch )
SMDEBUGLOG << "tpl_scan_close: ";
if (ntplsch)
SMDEBUGLOG << " tableid: " << ntplsch->tableid << endl;
SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch;
SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl;
#endif
ntplsch.reset();
@@ -364,11 +367,12 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch )
status_t
tpl_close ( cpsm_tplh_t* ntplh,
cpsm_conhdl_t** conn_hdl,
QueryStats& stats )
QueryStats& stats,
bool clear_scan_ctx)
{
cpsm_conhdl_t* hndl = *conn_hdl;
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close: " << hndl;
SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh;
if (ntplh)
SMDEBUGLOG << " tableid: " << ntplh->tableid;
@@ -385,7 +389,16 @@ tpl_close ( cpsm_tplh_t* ntplh,
ByteStream::quadbyte qb = 3;
bs << qb;
hndl->write(bs);
// MCOL-1601 Dispose of unused empty RowGroup
if (clear_scan_ctx)
{
bs = hndl->exeMgr->read();
}
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl;
#endif
//keep reading until we get a string
//TODO: really need to fix this! Why is ExeMgr sending other stuff?
for (int tries = 0; tries < 10; tries++)
@@ -414,6 +427,9 @@ tpl_close ( cpsm_tplh_t* ntplh,
{
// querystats messed up. close connection.
// no need to throw for querystats protocol error, like for tablemode.
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl;
#endif
end_query(hndl);
sm_cleanup(hndl);
*conn_hdl = 0;
@@ -435,9 +451,9 @@ sm_init ( uint32_t sid,
{
// clear file content
#if IDB_SM_DEBUG
smlog.close();
smlog.open("/tmp/sm.log");
SMDEBUGLOG << "sm_init: " << dboptions << endl;
//smlog.close();
//smlog.open("/tmp/sm.log");
SMDEBUGLOG << "sm_init: " << endl;
#endif
// @bug5660 Connection changes related to the local pm setting
@@ -473,7 +489,6 @@ sm_cleanup ( cpsm_conhdl_t* conn_hdl )
{
#if IDB_SM_DEBUG
SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl;
SMDEBUGLOG.close();
#endif
delete conn_hdl;

View File

@@ -60,12 +60,12 @@ const int SQL_NOT_FOUND = -1000;
const int SQL_KILLED = -1001;
const int CALPONT_INTERNAL_ERROR = -1007;
#if IDB_SM_DEBUG
extern std::ofstream smlog;
#define SMDEBUGLOG smlog
#else
#define SMDEBUGLOG if (false) std::cerr
#endif
//#if IDB_SM_DEBUG
//extern std::ofstream smlog;
//#define SMDEBUGLOG smlog
//#else
#define SMDEBUGLOG if (true) std::cerr
//#endif
extern const std::string DEFAULT_SAVE_PATH;
typedef uint64_t tableid_t;
@@ -282,7 +282,7 @@ extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*);
extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*);
extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0);
extern status_t tpl_scan_close(sp_cpsm_tplsch_t&);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool clear_scan_ctx = false);
}