diff --git a/dbcon/mysql/columnstore_info.sql b/dbcon/mysql/columnstore_info.sql index 563052a11..d0433a0d9 100644 --- a/dbcon/mysql/columnstore_info.sql +++ b/dbcon/mysql/columnstore_info.sql @@ -37,43 +37,56 @@ DROP PROCEDURE IF EXISTS `table_usage` // CREATE PROCEDURE table_usage (IN t_schema char(64), IN t_name char(64)) `table_usage`: BEGIN + DECLARE done INTEGER DEFAULT 0; + DECLARE dbname VARCHAR(64); + DECLARE tbname VARCHAR(64); + DECLARE object_ids TEXT; + DECLARE dictionary_object_ids TEXT; DECLARE `locker` TINYINT UNSIGNED DEFAULT IS_USED_LOCK('table_usage'); - + DECLARE columns_list CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_name = t_name and table_schema = t_schema GROUP BY table_schema, table_name; + DECLARE columns_list_sc CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_schema = t_schema GROUP BY table_schema, table_name; + DECLARE columns_list_all CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS GROUP BY table_schema, table_name; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; IF `locker` IS NOT NULL THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Error acquiring table_usage lock'; LEAVE `table_usage`; END IF; DO GET_LOCK('table_usage', 0); - DROP TABLE IF EXISTS columnstore_info.columnstore_columns; DROP TABLE IF EXISTS columnstore_info.columnstore_files; - CREATE TABLE columnstore_info.columnstore_columns engine=myisam as (select * from information_schema.columnstore_columns); - ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `object_id` (`object_id`); - ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `dictionary_object_id` (`dictionary_object_id`); - CREATE TABLE columnstore_info.columnstore_files engine=myisam as (select * from information_schema.columnstore_files); - ALTER TABLE columnstore_info.columnstore_files ADD INDEX `object_id` (`object_id`); + CREATE TEMPORARY TABLE columnstore_info.columnstore_files (TABLE_SCHEMA VARCHAR(64), TABLE_NAME VARCHAR(64), DATA BIGINT, DICT BIGINT); + IF t_name IS NOT NULL THEN -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics where table_name = t_name and (table_schema = t_schema or t_schema IS NULL) -group by table_schema, table_name -) q; + OPEN columns_list; ELSEIF t_schema IS NOT NULL THEN -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics where table_schema = t_schema -group by table_schema, table_name -) q; + OPEN columns_list_sc; ELSE -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics -group by table_schema, table_name -) q; + OPEN columns_list_all; END IF; - DROP TABLE IF EXISTS columnstore_info.columnstore_columns; + + files_table: LOOP + IF t_name IS NOT NULL THEN + FETCH columns_list INTO dbname, tbname, object_ids, dictionary_object_ids; + ELSEIF t_schema IS NOT NULL THEN + FETCH columns_list_sc INTO dbname, tbname, object_ids, dictionary_object_ids; + ELSE + FETCH columns_list_all INTO dbname, tbname, object_ids, dictionary_object_ids; + END IF; + IF done = 1 THEN LEAVE files_table; + END IF; + INSERT INTO columnstore_info.columnstore_files (SELECT dbname, tbname, sum(file_size), 0 FROM information_schema.columnstore_files WHERE find_in_set(object_id, object_ids)); + IF dictionary_object_ids IS NOT NULL THEN + UPDATE columnstore_info.columnstore_files SET DICT = (SELECT sum(file_size) FROM information_schema.columnstore_files WHERE find_in_set(object_id, dictionary_object_ids)) WHERE TABLE_SCHEMA = dbname AND TABLE_NAME = tbname; + END IF; + END LOOP; + IF t_name IS NOT NULL THEN + CLOSE columns_list; + ELSEIF t_schema IS NOT NULL THEN + CLOSE columns_list_sc; + ELSE + CLOSE columns_list_all; + END IF; + SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(DATA) as DATA_DISK_USAGE, columnstore_info.format_filesize(DICT) as DICT_DATA_USAGE, columnstore_info.format_filesize(DATA + COALESCE(DICT, 0)) as TOTAL_USAGE FROM columnstore_info.columnstore_files; + DROP TABLE IF EXISTS columnstore_info.columnstore_files; DO RELEASE_LOCK('table_usage'); END // diff --git a/dbcon/mysql/is_columnstore_columns.cpp b/dbcon/mysql/is_columnstore_columns.cpp index b446eed4e..e2f338782 100644 --- a/dbcon/mysql/is_columnstore_columns.cpp +++ b/dbcon/mysql/is_columnstore_columns.cpp @@ -56,10 +56,59 @@ ST_FIELD_INFO is_columnstore_columns_fields[] = }; +static void get_cond_item(Item_func *item, String **table, String **db) +{ + char tmp_char[MAX_FIELD_WIDTH]; + Item_field *item_field = (Item_field*) item->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "table_name") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *table = item->arguments()[1]->val_str(&str_buf); + return; + } + else if (strcasecmp(item_field->field_name, "table_schema") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *db = item->arguments()[1]->val_str(&str_buf); + return; + } +} + +static void get_cond_items(COND *cond, String **table, String **db) +{ + if (cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + get_cond_item(fitem, table, db); + } + } + else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC)) + { + List_iterator li(*((Item_cond*) cond)->argument_list()); + Item *item; + while ((item= li++)) + { + if (item->type() == Item::FUNC_ITEM) + { + get_cond_item((Item_func*)item, table, db); + } + else + { + get_cond_items(item, table, db); + } + } + } +} + static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) { CHARSET_INFO *cs = system_charset_info; TABLE *table = tables->table; + String *table_name = NULL; + String *db_name = NULL; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); @@ -69,9 +118,29 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + if (cond) + { + get_cond_items(cond, &table_name, &db_name); + } + for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { + if (db_name) + { + if ((*it).second.schema.compare(db_name->ptr()) != 0) + { + continue; + } + } + if (table_name) + { + if ((*it).second.table.compare(table_name->ptr()) != 0) + { + continue; + } + } + execplan::CalpontSystemCatalog::RIDList column_rid_list; // Note a table may get dropped as you iterate over the list of tables. // So simply ignore the dropped table. @@ -168,8 +237,6 @@ static int is_columnstore_columns_fill(THD *thd, TABLE_LIST *tables, COND *cond) } } - - return 0; } diff --git a/dbcon/mysql/is_columnstore_extents.cpp b/dbcon/mysql/is_columnstore_extents.cpp index 4ee4cbce6..27eceeafc 100644 --- a/dbcon/mysql/is_columnstore_extents.cpp +++ b/dbcon/mysql/is_columnstore_extents.cpp @@ -52,131 +52,200 @@ ST_FIELD_INFO is_columnstore_extents_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; -static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) +static int generate_result(BRM::OID_t oid, BRM::DBRM *emp, TABLE *table, THD *thd) { CHARSET_INFO *cs = system_charset_info; - TABLE *table = tables->table; std::vector entries; std::vector::iterator iter; std::vector::iterator end; + emp->getExtents(oid, entries, false, false, true); + if (entries.size() == 0) + return 0; + + iter = entries.begin(); + end = entries.end(); + while (iter != end) + { + table->field[0]->store(oid); + if (iter->colWid > 0) + { + table->field[1]->store("Column", strlen("Column"), cs); + if (iter->partition.cprange.lo_val == std::numeric_limits::max() || + iter->partition.cprange.lo_val <= (std::numeric_limits::min() + 2)) + { + table->field[4]->set_null(); + } + else + { + table->field[4]->set_notnull(); + table->field[4]->store(iter->partition.cprange.lo_val); + } + if (iter->partition.cprange.hi_val == std::numeric_limits::max() || + iter->partition.cprange.hi_val <= (std::numeric_limits::min() + 2)) + { + table->field[5]->set_null(); + } + else + { + table->field[5]->set_notnull(); + table->field[5]->store(iter->partition.cprange.hi_val); + } + table->field[6]->store(iter->colWid); + + } + else + { + table->field[1]->store("Dictionary", strlen("Dictionary"), cs); + table->field[4]->set_null(); + table->field[5]->set_null(); + table->field[6]->store(8192); + } + table->field[2]->store(iter->range.start); + table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1); + + table->field[7]->store(iter->dbRoot); + table->field[8]->store(iter->partitionNum); + table->field[9]->store(iter->segmentNum); + table->field[10]->store(iter->blockOffset); + table->field[11]->store(iter->range.size * 1024); + table->field[12]->store(iter->HWM); + + switch (iter->partition.cprange.isValid) + { + case 0: + table->field[13]->store("Invalid", strlen("Invalid"), cs); + break; + case 1: + table->field[13]->store("Updating", strlen("Updating"), cs); + break; + case 2: + table->field[13]->store("Valid", strlen("Valid"), cs); + break; + default: + table->field[13]->store("Unknown", strlen("Unknown"), cs); + break; + } + switch (iter->status) + { + case BRM::EXTENTAVAILABLE: + table->field[14]->store("Available", strlen("Available"), cs); + break; + case BRM::EXTENTUNAVAILABLE: + table->field[14]->store("Unavailable", strlen("Unavailable"), cs); + break; + case BRM::EXTENTOUTOFSERVICE: + table->field[14]->store("Out of service", strlen("Out of service"), cs); + break; + default: + table->field[14]->store("Unknown", strlen("Unknown"), cs); + } + // MCOL-1016: on multiple segments HWM is set to 0 on the lower + // segments, we don't want these to show as 8KB. The down side is + // if the column has less than 1 block it will show as 0 bytes. + // We have no lookahead without it getting messy so this is the + // best compromise. + if (iter->HWM == 0) + { + table->field[15]->store(0); + } + else + { + table->field[15]->store((iter->HWM + 1) * 8192); + } + + if (schema_table_store_record(thd, table)) + { + delete emp; + return 1; + } + + iter++; + + } + return 0; +} + +static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::OID_t cond_oid = 0; + TABLE *table = tables->table; + BRM::DBRM *emp = new BRM::DBRM(); if (!emp || !emp->isDBRMReady()) { return 1; } + if (cond && cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2)) + { + if(fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + // WHERE object_id = value + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[1]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[0]->const_item()) + { + // WHERE value = object_id + Item_field *item_field = (Item_field*) fitem->arguments()[1]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[0]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + } + else if (fitem->functype() == Item_func::IN_FUNC) + { + // WHERE object_id in (value1, value2) + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + for (unsigned int i=1; i < fitem->argument_count(); i++) + { + cond_oid = fitem->arguments()[i]->val_int(); + int result = generate_result(cond_oid, emp, table, thd); + if (result) + return 1; + } + } + } + else if (fitem->functype() == Item_func::UNKNOWN_FUNC && + strcasecmp(fitem->func_name(), "find_in_set") == 0) + { + // WHERE FIND_IN_SET(object_id, values) + String *tmp_var = fitem->arguments()[1]->val_str(); + std::stringstream ss(tmp_var->ptr()); + while (ss >> cond_oid) + { + int ret = generate_result(cond_oid, emp, table, thd); + if (ret) + return 1; + if (ss.peek() == ',') + ss.ignore(); + } + } + } + execplan::ObjectIDManager oidm; BRM::OID_t MaxOID = oidm.size(); for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) { - emp->getExtents(oid, entries, false, false, true); - if (entries.size() == 0) - continue; - - iter = entries.begin(); - end = entries.end(); - while (iter != end) - { - table->field[0]->store(oid); - if (iter->colWid > 0) - { - table->field[1]->store("Column", strlen("Column"), cs); - if (iter->partition.cprange.lo_val == std::numeric_limits::max() || - iter->partition.cprange.lo_val <= (std::numeric_limits::min() + 2)) - { - table->field[4]->set_null(); - } - else - { - table->field[4]->set_notnull(); - table->field[4]->store(iter->partition.cprange.lo_val); - } - if (iter->partition.cprange.hi_val == std::numeric_limits::max() || - iter->partition.cprange.hi_val <= (std::numeric_limits::min() + 2)) - { - table->field[5]->set_null(); - } - else - { - table->field[5]->set_notnull(); - table->field[5]->store(iter->partition.cprange.hi_val); - } - table->field[6]->store(iter->colWid); - - } - else - { - table->field[1]->store("Dictionary", strlen("Dictionary"), cs); - table->field[4]->set_null(); - table->field[5]->set_null(); - table->field[6]->store(8192); - } - table->field[2]->store(iter->range.start); - table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1); - - table->field[7]->store(iter->dbRoot); - table->field[8]->store(iter->partitionNum); - table->field[9]->store(iter->segmentNum); - table->field[10]->store(iter->blockOffset); - table->field[11]->store(iter->range.size * 1024); - table->field[12]->store(iter->HWM); - - switch (iter->partition.cprange.isValid) - { - case 0: - table->field[13]->store("Invalid", strlen("Invalid"), cs); - break; - case 1: - table->field[13]->store("Updating", strlen("Updating"), cs); - break; - case 2: - table->field[13]->store("Valid", strlen("Valid"), cs); - break; - default: - table->field[13]->store("Unknown", strlen("Unknown"), cs); - break; - } - switch (iter->status) - { - case BRM::EXTENTAVAILABLE: - table->field[14]->store("Available", strlen("Available"), cs); - break; - case BRM::EXTENTUNAVAILABLE: - table->field[14]->store("Unavailable", strlen("Unavailable"), cs); - break; - case BRM::EXTENTOUTOFSERVICE: - table->field[14]->store("Out of service", strlen("Out of service"), cs); - break; - default: - table->field[14]->store("Unknown", strlen("Unknown"), cs); - } - // MCOL-1016: on multiple segments HWM is set to 0 on the lower - // segments, we don't want these to show as 8KB. The down side is - // if the column has less than 1 block it will show as 0 bytes. - // We have no lookahead without it getting messy so this is the - // best compromise. - if (iter->HWM == 0) - { - table->field[15]->store(0); - } - else - { - table->field[15]->store((iter->HWM + 1) * 8192); - } - - if (schema_table_store_record(thd, table)) - { - delete emp; - return 1; - } - - iter++; - - } + int result = generate_result(oid, emp, table, thd); + if (result) + return 1; } - delete emp; return 0; } diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index ce00b8aae..1a5fdad1e 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -82,12 +82,10 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient *msgQueueClient, cons } } -static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +static int generate_result(BRM::OID_t oid, BRM::DBRM *emp, TABLE *table, THD *thd) { - BRM::DBRM *emp = new BRM::DBRM(); std::vector entries; CHARSET_INFO *cs = system_charset_info; - TABLE *table = tables->table; char oidDirName[WriteEngine::FILE_NAME_SIZE]; char fullFileName[WriteEngine::FILE_NAME_SIZE]; @@ -101,93 +99,168 @@ static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) oam::Oam oam_instance; int pmId = 0; + emp->getExtents(oid, entries, false, false, true); + if (entries.size() == 0) + return 0; + + std::vector::const_iterator iter = entries.begin(); + while ( iter != entries.end() ) //organize extents into files + { + // Don't include files more than once at different block offsets + if (iter->blockOffset > 0) + { + iter++; + return 0; + } + + try + { + oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + } + catch (std::runtime_error) + { + // MCOL-1116: If we are here a DBRoot is offline/missing + iter++; + return 0; + } + table->field[0]->store(oid); + table->field[1]->store(iter->segmentNum); + table->field[2]->store(iter->partitionNum); + + WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); + std::stringstream DbRootName; + DbRootName << "DBRoot" << iter->dbRoot; + std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); + fileSize = compressedFileSize = 0; + snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); + + std::ostringstream oss; + oss << "pm" << pmId << "_WriteEngineServer"; + std::string client = oss.str(); + msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); + + if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) + { + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + delete emp; + return 1; + } + table->field[3]->store(fullFileName, strlen(fullFileName), cs); + + if (fileSize > 0) + { + table->field[4]->set_notnull(); + table->field[4]->store(fileSize); + if (compressedFileSize > 0) + { + table->field[5]->set_notnull(); + table->field[5]->store(compressedFileSize); + } + else + { + table->field[5]->set_null(); + } + } + else + { + table->field[4]->set_null(); + table->field[5]->set_null(); + } + + if (schema_table_store_record(thd, table)) + { + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + delete emp; + return 1; + } + iter++; + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + msgQueueClient = NULL; + } + return 0; +} + +static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::DBRM *emp = new BRM::DBRM(); + BRM::OID_t cond_oid = 0; + TABLE *table = tables->table; + if (!emp || !emp->isDBRMReady()) { return 1; } + if (cond && cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2)) + { + if(fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + // WHERE object_id = value + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[1]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[0]->const_item()) + { + // WHERE value = object_id + Item_field *item_field = (Item_field*) fitem->arguments()[1]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[0]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + } + else if (fitem->functype() == Item_func::IN_FUNC) + { + // WHERE object_id in (value1, value2) + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + for (unsigned int i=1; i < fitem->argument_count(); i++) + { + cond_oid = fitem->arguments()[i]->val_int(); + int result = generate_result(cond_oid, emp, table, thd); + if (result) + return 1; + } + } + } + else if (fitem->functype() == Item_func::UNKNOWN_FUNC && + strcasecmp(fitem->func_name(), "find_in_set") == 0) + { + // WHERE FIND_IN_SET(object_id, values) + String *tmp_var = fitem->arguments()[1]->val_str(); + std::stringstream ss(tmp_var->ptr()); + while (ss >> cond_oid) + { + int ret = generate_result(cond_oid, emp, table, thd); + if (ret) + return 1; + if (ss.peek() == ',') + ss.ignore(); + } + } + } + execplan::ObjectIDManager oidm; BRM::OID_t MaxOID = oidm.size(); - for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) + if (!cond_oid) { - emp->getExtents(oid, entries, false, false, true); - if (entries.size() == 0) - continue; - - std::vector::const_iterator iter = entries.begin(); - while ( iter != entries.end() ) //organize extents into files + for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) { - // Don't include files more than once at different block offsets - if (iter->blockOffset > 0) - { - iter++; - continue; - } - - try - { - oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); - } - catch (std::runtime_error) - { - // MCOL-1116: If we are here a DBRoot is offline/missing - iter++; - continue; - } - table->field[0]->store(oid); - table->field[1]->store(iter->segmentNum); - table->field[2]->store(iter->partitionNum); - - WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); - std::stringstream DbRootName; - DbRootName << "DBRoot" << iter->dbRoot; - std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); - fileSize = compressedFileSize = 0; - snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); - - std::ostringstream oss; - oss << "pm" << pmId << "_WriteEngineServer"; - std::string client = oss.str(); - msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); - - if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) - { - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - delete emp; + int result = generate_result(oid, emp, table, thd); + if (result) return 1; - } - table->field[3]->store(fullFileName, strlen(fullFileName), cs); - - if (fileSize > 0) - { - table->field[4]->set_notnull(); - table->field[4]->store(fileSize); - if (compressedFileSize > 0) - { - table->field[5]->set_notnull(); - table->field[5]->store(compressedFileSize); - } - else - { - table->field[5]->set_null(); - } - } - else - { - table->field[4]->set_null(); - table->field[5]->set_null(); - } - - if (schema_table_store_record(thd, table)) - { - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - delete emp; - return 1; - } - iter++; - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - msgQueueClient = NULL; } } delete emp; diff --git a/dbcon/mysql/is_columnstore_tables.cpp b/dbcon/mysql/is_columnstore_tables.cpp index 47ce4970c..baa894487 100644 --- a/dbcon/mysql/is_columnstore_tables.cpp +++ b/dbcon/mysql/is_columnstore_tables.cpp @@ -42,22 +42,91 @@ ST_FIELD_INFO is_columnstore_tables_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; +static void get_cond_item(Item_func *item, String **table, String **db) +{ + char tmp_char[MAX_FIELD_WIDTH]; + Item_field *item_field = (Item_field*) item->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "table_name") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *table = item->arguments()[1]->val_str(&str_buf); + return; + } + else if (strcasecmp(item_field->field_name, "table_schema") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *db = item->arguments()[1]->val_str(&str_buf); + return; + } +} + +static void get_cond_items(COND *cond, String **table, String **db) +{ + if (cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + get_cond_item(fitem, table, db); + } + } + else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC)) + { + List_iterator li(*((Item_cond*) cond)->argument_list()); + Item *item; + while ((item= li++)) + { + if (item->type() == Item::FUNC_ITEM) + { + get_cond_item((Item_func*)item, table, db); + } + else + { + get_cond_items(item, table, db); + } + } + } +} + static int is_columnstore_tables_fill(THD *thd, TABLE_LIST *tables, COND *cond) { CHARSET_INFO *cs = system_charset_info; TABLE *table = tables->table; + String *table_name = NULL; + String *db_name = NULL; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + if (cond) + { + get_cond_items(cond, &table_name, &db_name); + } + const std::vector< std::pair > catalog_tables = systemCatalogPtr->getTables(); for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { + if (db_name) + { + if ((*it).second.schema.compare(db_name->ptr()) != 0) + { + continue; + } + } + if (table_name) + { + if ((*it).second.table.compare(table_name->ptr()) != 0) + { + continue; + } + } + execplan::CalpontSystemCatalog::TableInfo tb_info = systemCatalogPtr->tableInfo((*it).second); std::string create_date = dataconvert::DataConvert::dateToString((*it).second.create_date); table->field[0]->store((*it).second.schema.c_str(), (*it).second.schema.length(), cs);