diff --git a/dbcon/joblist/crossenginestep.cpp b/dbcon/joblist/crossenginestep.cpp index 789e58cd4..d3cef7928 100644 --- a/dbcon/joblist/crossenginestep.cpp +++ b/dbcon/joblist/crossenginestep.cpp @@ -744,7 +744,6 @@ string CrossEngineStep::makeQuery() // the string must consist of a single SQL statement without a terminating semicolon ; or \g. // oss << ";"; - return oss.str(); } diff --git a/dbcon/mysql/columnstore_info.sql b/dbcon/mysql/columnstore_info.sql index 563052a11..d0433a0d9 100644 --- a/dbcon/mysql/columnstore_info.sql +++ b/dbcon/mysql/columnstore_info.sql @@ -37,43 +37,56 @@ DROP PROCEDURE IF EXISTS `table_usage` // CREATE PROCEDURE table_usage (IN t_schema char(64), IN t_name char(64)) `table_usage`: BEGIN + DECLARE done INTEGER DEFAULT 0; + DECLARE dbname VARCHAR(64); + DECLARE tbname VARCHAR(64); + DECLARE object_ids TEXT; + DECLARE dictionary_object_ids TEXT; DECLARE `locker` TINYINT UNSIGNED DEFAULT IS_USED_LOCK('table_usage'); - + DECLARE columns_list CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_name = t_name and table_schema = t_schema GROUP BY table_schema, table_name; + DECLARE columns_list_sc CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS WHERE table_schema = t_schema GROUP BY table_schema, table_name; + DECLARE columns_list_all CURSOR FOR SELECT TABLE_SCHEMA, TABLE_NAME, GROUP_CONCAT(object_id) OBJECT_IDS, GROUP_CONCAT(dictionary_object_id) DICT_OBJECT_IDS FROM INFORMATION_SCHEMA.COLUMNSTORE_COLUMNS GROUP BY table_schema, table_name; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; IF `locker` IS NOT NULL THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Error acquiring table_usage lock'; LEAVE `table_usage`; END IF; DO GET_LOCK('table_usage', 0); - DROP TABLE IF EXISTS columnstore_info.columnstore_columns; DROP TABLE IF EXISTS columnstore_info.columnstore_files; - CREATE TABLE columnstore_info.columnstore_columns engine=myisam as (select * from information_schema.columnstore_columns); - ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `object_id` (`object_id`); - ALTER TABLE columnstore_info.columnstore_columns ADD INDEX `dictionary_object_id` (`dictionary_object_id`); - CREATE TABLE columnstore_info.columnstore_files engine=myisam as (select * from information_schema.columnstore_files); - ALTER TABLE columnstore_info.columnstore_files ADD INDEX `object_id` (`object_id`); + CREATE TEMPORARY TABLE columnstore_info.columnstore_files (TABLE_SCHEMA VARCHAR(64), TABLE_NAME VARCHAR(64), DATA BIGINT, DICT BIGINT); + IF t_name IS NOT NULL THEN -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics where table_name = t_name and (table_schema = t_schema or t_schema IS NULL) -group by table_schema, table_name -) q; + OPEN columns_list; ELSEIF t_schema IS NOT NULL THEN -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics where table_schema = t_schema -group by table_schema, table_name -) q; + OPEN columns_list_sc; ELSE -SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(data) as DATA_DISK_USAGE, columnstore_info.format_filesize(dict) as DICT_DISK_USAGE, columnstore_info.format_filesize(data + COALESCE(dict, 0)) as TOTAL_USAGE FROM ( -SELECT TABLE_SCHEMA, TABLE_NAME, (SELECT sum(cf.file_size) as data FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema) as data, (SELECT sum(cf.file_size) as dict FROM columnstore_info.columnstore_columns cc JOIN columnstore_info.columnstore_files cf ON cc.dictionary_object_id = cf.object_id WHERE table_name = ics.table_name and table_schema = ics.table_schema GROUP BY table_schema, table_name) as dict -FROM -columnstore_info.columnstore_columns ics -group by table_schema, table_name -) q; + OPEN columns_list_all; END IF; - DROP TABLE IF EXISTS columnstore_info.columnstore_columns; + + files_table: LOOP + IF t_name IS NOT NULL THEN + FETCH columns_list INTO dbname, tbname, object_ids, dictionary_object_ids; + ELSEIF t_schema IS NOT NULL THEN + FETCH columns_list_sc INTO dbname, tbname, object_ids, dictionary_object_ids; + ELSE + FETCH columns_list_all INTO dbname, tbname, object_ids, dictionary_object_ids; + END IF; + IF done = 1 THEN LEAVE files_table; + END IF; + INSERT INTO columnstore_info.columnstore_files (SELECT dbname, tbname, sum(file_size), 0 FROM information_schema.columnstore_files WHERE find_in_set(object_id, object_ids)); + IF dictionary_object_ids IS NOT NULL THEN + UPDATE columnstore_info.columnstore_files SET DICT = (SELECT sum(file_size) FROM information_schema.columnstore_files WHERE find_in_set(object_id, dictionary_object_ids)) WHERE TABLE_SCHEMA = dbname AND TABLE_NAME = tbname; + END IF; + END LOOP; + IF t_name IS NOT NULL THEN + CLOSE columns_list; + ELSEIF t_schema IS NOT NULL THEN + CLOSE columns_list_sc; + ELSE + CLOSE columns_list_all; + END IF; + SELECT TABLE_SCHEMA, TABLE_NAME, columnstore_info.format_filesize(DATA) as DATA_DISK_USAGE, columnstore_info.format_filesize(DICT) as DICT_DATA_USAGE, columnstore_info.format_filesize(DATA + COALESCE(DICT, 0)) as TOTAL_USAGE FROM columnstore_info.columnstore_files; + DROP TABLE IF EXISTS columnstore_info.columnstore_files; DO RELEASE_LOCK('table_usage'); END // diff --git a/dbcon/mysql/is_columnstore_columns.cpp b/dbcon/mysql/is_columnstore_columns.cpp index 53f67fbbf..1aa379724 100644 --- a/dbcon/mysql/is_columnstore_columns.cpp +++ b/dbcon/mysql/is_columnstore_columns.cpp @@ -56,10 +56,59 @@ ST_FIELD_INFO is_columnstore_columns_fields[] = }; +static void get_cond_item(Item_func *item, String **table, String **db) +{ + char tmp_char[MAX_FIELD_WIDTH]; + Item_field *item_field = (Item_field*) item->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "table_name") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *table = item->arguments()[1]->val_str(&str_buf); + return; + } + else if (strcasecmp(item_field->field_name, "table_schema") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *db = item->arguments()[1]->val_str(&str_buf); + return; + } +} + +static void get_cond_items(COND *cond, String **table, String **db) +{ + if (cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + get_cond_item(fitem, table, db); + } + } + else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC)) + { + List_iterator li(*((Item_cond*) cond)->argument_list()); + Item *item; + while ((item= li++)) + { + if (item->type() == Item::FUNC_ITEM) + { + get_cond_item((Item_func*)item, table, db); + } + else + { + get_cond_items(item, table, db); + } + } + } +} + static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond) { CHARSET_INFO* cs = system_charset_info; TABLE* table = tables->table; + String *table_name = NULL; + String *db_name = NULL; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); @@ -69,9 +118,29 @@ static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond) systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + if (cond) + { + get_cond_items(cond, &table_name, &db_name); + } + for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { + if (db_name) + { + if ((*it).second.schema.compare(db_name->ptr()) != 0) + { + continue; + } + } + if (table_name) + { + if ((*it).second.table.compare(table_name->ptr()) != 0) + { + continue; + } + } + execplan::CalpontSystemCatalog::RIDList column_rid_list; // Note a table may get dropped as you iterate over the list of tables. @@ -184,8 +253,6 @@ static int is_columnstore_columns_fill(THD* thd, TABLE_LIST* tables, COND* cond) } } - - return 0; } diff --git a/dbcon/mysql/is_columnstore_extents.cpp b/dbcon/mysql/is_columnstore_extents.cpp index 8eb8e15e3..c6dcba567 100644 --- a/dbcon/mysql/is_columnstore_extents.cpp +++ b/dbcon/mysql/is_columnstore_extents.cpp @@ -52,147 +52,216 @@ ST_FIELD_INFO is_columnstore_extents_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; -static int is_columnstore_extents_fill(THD* thd, TABLE_LIST* tables, COND* cond) +static int generate_result(BRM::OID_t oid, BRM::DBRM *emp, TABLE *table, THD *thd) { CHARSET_INFO* cs = system_charset_info; - TABLE* table = tables->table; std::vector entries; std::vector::iterator iter; std::vector::iterator end; - BRM::DBRM* emp = new BRM::DBRM(); + emp->getExtents(oid, entries, false, false, true); + + if (entries.size() == 0) + return 0; + + iter = entries.begin(); + end = entries.end(); + + while (iter != end) + { + table->field[0]->store(oid); + + if (iter->colWid > 0) + { + table->field[1]->store("Column", strlen("Column"), cs); + + if (iter->partition.cprange.lo_val == std::numeric_limits::max() || + iter->partition.cprange.lo_val <= (std::numeric_limits::min() + 2)) + { + table->field[4]->set_null(); + } + else + { + table->field[4]->set_notnull(); + table->field[4]->store(iter->partition.cprange.lo_val); + } + + if (iter->partition.cprange.hi_val == std::numeric_limits::max() || + iter->partition.cprange.hi_val <= (std::numeric_limits::min() + 2)) + { + table->field[5]->set_null(); + } + else + { + table->field[5]->set_notnull(); + table->field[5]->store(iter->partition.cprange.hi_val); + } + + table->field[6]->store(iter->colWid); + + } + else + { + table->field[1]->store("Dictionary", strlen("Dictionary"), cs); + table->field[4]->set_null(); + table->field[5]->set_null(); + table->field[6]->store(8192); + } + + table->field[2]->store(iter->range.start); + table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1); + + table->field[7]->store(iter->dbRoot); + table->field[8]->store(iter->partitionNum); + table->field[9]->store(iter->segmentNum); + table->field[10]->store(iter->blockOffset); + table->field[11]->store(iter->range.size * 1024); + table->field[12]->store(iter->HWM); + + switch (iter->partition.cprange.isValid) + { + case 0: + table->field[13]->store("Invalid", strlen("Invalid"), cs); + break; + + case 1: + table->field[13]->store("Updating", strlen("Updating"), cs); + break; + + case 2: + table->field[13]->store("Valid", strlen("Valid"), cs); + break; + + default: + table->field[13]->store("Unknown", strlen("Unknown"), cs); + break; + } + + switch (iter->status) + { + case BRM::EXTENTAVAILABLE: + table->field[14]->store("Available", strlen("Available"), cs); + break; + + case BRM::EXTENTUNAVAILABLE: + table->field[14]->store("Unavailable", strlen("Unavailable"), cs); + break; + + case BRM::EXTENTOUTOFSERVICE: + table->field[14]->store("Out of service", strlen("Out of service"), cs); + break; + + default: + table->field[14]->store("Unknown", strlen("Unknown"), cs); + } + + // MCOL-1016: on multiple segments HWM is set to 0 on the lower + // segments, we don't want these to show as 8KB. The down side is + // if the column has less than 1 block it will show as 0 bytes. + // We have no lookahead without it getting messy so this is the + // best compromise. + if (iter->HWM == 0) + { + table->field[15]->store(0); + } + else + { + table->field[15]->store((iter->HWM + 1) * 8192); + } + + if (schema_table_store_record(thd, table)) + { + delete emp; + return 1; + } + + iter++; + + } + return 0; +} + +static int is_columnstore_extents_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::OID_t cond_oid = 0; + TABLE *table = tables->table; + + BRM::DBRM *emp = new BRM::DBRM(); if (!emp || !emp->isDBRMReady()) { return 1; } - execplan::ObjectIDManager oidm; - BRM::OID_t MaxOID = oidm.size(); - - for (BRM::OID_t oid = 3000; oid <= MaxOID; oid++) + if (cond && cond->type() == Item::FUNC_ITEM) { - emp->getExtents(oid, entries, false, false, true); - - if (entries.size() == 0) - continue; - - iter = entries.begin(); - end = entries.end(); - - while (iter != end) + Item_func* fitem = (Item_func*) cond; + if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2)) { - table->field[0]->store(oid); - - if (iter->colWid > 0) + if(fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) { - table->field[1]->store("Column", strlen("Column"), cs); - - if (iter->partition.cprange.lo_val == std::numeric_limits::max() || - iter->partition.cprange.lo_val <= (std::numeric_limits::min() + 2)) + // WHERE object_id = value + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) { - table->field[4]->set_null(); + cond_oid = fitem->arguments()[1]->val_int(); + return generate_result(cond_oid, emp, table, thd); } - else + } + else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[0]->const_item()) + { + // WHERE value = object_id + Item_field *item_field = (Item_field*) fitem->arguments()[1]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) { - table->field[4]->set_notnull(); - table->field[4]->store(iter->partition.cprange.lo_val); + cond_oid = fitem->arguments()[0]->val_int(); + return generate_result(cond_oid, emp, table, thd); } - - if (iter->partition.cprange.hi_val == std::numeric_limits::max() || - iter->partition.cprange.hi_val <= (std::numeric_limits::min() + 2)) + } + } + else if (fitem->functype() == Item_func::IN_FUNC) + { + // WHERE object_id in (value1, value2) + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + for (unsigned int i=1; i < fitem->argument_count(); i++) { - table->field[5]->set_null(); + cond_oid = fitem->arguments()[i]->val_int(); + int result = generate_result(cond_oid, emp, table, thd); + if (result) + return 1; } - else - { - table->field[5]->set_notnull(); - table->field[5]->store(iter->partition.cprange.hi_val); - } - - table->field[6]->store(iter->colWid); - } - else + } + else if (fitem->functype() == Item_func::UNKNOWN_FUNC && + strcasecmp(fitem->func_name(), "find_in_set") == 0) + { + // WHERE FIND_IN_SET(object_id, values) + String *tmp_var = fitem->arguments()[1]->val_str(); + std::stringstream ss(tmp_var->ptr()); + while (ss >> cond_oid) { - table->field[1]->store("Dictionary", strlen("Dictionary"), cs); - table->field[4]->set_null(); - table->field[5]->set_null(); - table->field[6]->store(8192); + int ret = generate_result(cond_oid, emp, table, thd); + if (ret) + return 1; + if (ss.peek() == ',') + ss.ignore(); } - - table->field[2]->store(iter->range.start); - table->field[3]->store(iter->range.start + (iter->range.size * 1024) - 1); - - table->field[7]->store(iter->dbRoot); - table->field[8]->store(iter->partitionNum); - table->field[9]->store(iter->segmentNum); - table->field[10]->store(iter->blockOffset); - table->field[11]->store(iter->range.size * 1024); - table->field[12]->store(iter->HWM); - - switch (iter->partition.cprange.isValid) - { - case 0: - table->field[13]->store("Invalid", strlen("Invalid"), cs); - break; - - case 1: - table->field[13]->store("Updating", strlen("Updating"), cs); - break; - - case 2: - table->field[13]->store("Valid", strlen("Valid"), cs); - break; - - default: - table->field[13]->store("Unknown", strlen("Unknown"), cs); - break; - } - - switch (iter->status) - { - case BRM::EXTENTAVAILABLE: - table->field[14]->store("Available", strlen("Available"), cs); - break; - - case BRM::EXTENTUNAVAILABLE: - table->field[14]->store("Unavailable", strlen("Unavailable"), cs); - break; - - case BRM::EXTENTOUTOFSERVICE: - table->field[14]->store("Out of service", strlen("Out of service"), cs); - break; - - default: - table->field[14]->store("Unknown", strlen("Unknown"), cs); - } - - // MCOL-1016: on multiple segments HWM is set to 0 on the lower - // segments, we don't want these to show as 8KB. The down side is - // if the column has less than 1 block it will show as 0 bytes. - // We have no lookahead without it getting messy so this is the - // best compromise. - if (iter->HWM == 0) - { - table->field[15]->store(0); - } - else - { - table->field[15]->store((iter->HWM + 1) * 8192); - } - - if (schema_table_store_record(thd, table)) - { - delete emp; - return 1; - } - - iter++; - } } + execplan::ObjectIDManager oidm; + BRM::OID_t MaxOID = oidm.size(); + + for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) + { + int result = generate_result(oid, emp, table, thd); + if (result) + return 1; + } delete emp; return 0; } diff --git a/dbcon/mysql/is_columnstore_files.cpp b/dbcon/mysql/is_columnstore_files.cpp index 740075165..815345474 100644 --- a/dbcon/mysql/is_columnstore_files.cpp +++ b/dbcon/mysql/is_columnstore_files.cpp @@ -84,12 +84,10 @@ static bool get_file_sizes(messageqcpp::MessageQueueClient* msgQueueClient, cons } } -static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond) +static int generate_result(BRM::OID_t oid, BRM::DBRM *emp, TABLE *table, THD *thd) { - BRM::DBRM* emp = new BRM::DBRM(); std::vector entries; CHARSET_INFO* cs = system_charset_info; - TABLE* table = tables->table; char oidDirName[WriteEngine::FILE_NAME_SIZE]; char fullFileName[WriteEngine::FILE_NAME_SIZE]; @@ -103,99 +101,174 @@ static int is_columnstore_files_fill(THD* thd, TABLE_LIST* tables, COND* cond) oam::Oam oam_instance; int pmId = 0; + emp->getExtents(oid, entries, false, false, true); + + if (entries.size() == 0) + return 0; + + std::vector::const_iterator iter = entries.begin(); + + while ( iter != entries.end() ) //organize extents into files + { + // Don't include files more than once at different block offsets + if (iter->blockOffset > 0) + { + iter++; + return 0; + } + + try + { + oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); + } + catch (std::runtime_error) + { + // MCOL-1116: If we are here a DBRoot is offline/missing + iter++; + return 0; + } + + table->field[0]->store(oid); + table->field[1]->store(iter->segmentNum); + table->field[2]->store(iter->partitionNum); + + WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); + std::stringstream DbRootName; + DbRootName << "DBRoot" << iter->dbRoot; + std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); + fileSize = compressedFileSize = 0; + snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); + + std::ostringstream oss; + oss << "pm" << pmId << "_WriteEngineServer"; + std::string client = oss.str(); + msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); + + if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) + { + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + delete emp; + return 1; + } + + table->field[3]->store(fullFileName, strlen(fullFileName), cs); + + if (fileSize > 0) + { + table->field[4]->set_notnull(); + table->field[4]->store(fileSize); + + if (compressedFileSize > 0) + { + table->field[5]->set_notnull(); + table->field[5]->store(compressedFileSize); + } + else + { + table->field[5]->set_null(); + } + } + else + { + table->field[4]->set_null(); + table->field[5]->set_null(); + } + + if (schema_table_store_record(thd, table)) + { + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + delete emp; + return 1; + } + + iter++; + messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); + msgQueueClient = NULL; + } + return 0; +} + +static int is_columnstore_files_fill(THD *thd, TABLE_LIST *tables, COND *cond) +{ + BRM::DBRM *emp = new BRM::DBRM(); + BRM::OID_t cond_oid = 0; + TABLE *table = tables->table; + if (!emp || !emp->isDBRMReady()) { return 1; } + if (cond && cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if ((fitem->functype() == Item_func::EQ_FUNC) && (fitem->argument_count() == 2)) + { + if(fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + // WHERE object_id = value + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[1]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + else if (fitem->arguments()[1]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[0]->const_item()) + { + // WHERE value = object_id + Item_field *item_field = (Item_field*) fitem->arguments()[1]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + cond_oid = fitem->arguments()[0]->val_int(); + return generate_result(cond_oid, emp, table, thd); + } + } + } + else if (fitem->functype() == Item_func::IN_FUNC) + { + // WHERE object_id in (value1, value2) + Item_field *item_field = (Item_field*) fitem->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "object_id") == 0) + { + for (unsigned int i=1; i < fitem->argument_count(); i++) + { + cond_oid = fitem->arguments()[i]->val_int(); + int result = generate_result(cond_oid, emp, table, thd); + if (result) + return 1; + } + } + } + else if (fitem->functype() == Item_func::UNKNOWN_FUNC && + strcasecmp(fitem->func_name(), "find_in_set") == 0) + { + // WHERE FIND_IN_SET(object_id, values) + String *tmp_var = fitem->arguments()[1]->val_str(); + std::stringstream ss(tmp_var->ptr()); + while (ss >> cond_oid) + { + int ret = generate_result(cond_oid, emp, table, thd); + if (ret) + return 1; + if (ss.peek() == ',') + ss.ignore(); + } + } + } + execplan::ObjectIDManager oidm; BRM::OID_t MaxOID = oidm.size(); - for (BRM::OID_t oid = 3000; oid <= MaxOID; oid++) + if (!cond_oid) { - emp->getExtents(oid, entries, false, false, true); - - if (entries.size() == 0) - continue; - - std::vector::const_iterator iter = entries.begin(); - - while ( iter != entries.end() ) //organize extents into files + for(BRM::OID_t oid = 3000; oid <= MaxOID; oid++) { - // Don't include files more than once at different block offsets - if (iter->blockOffset > 0) - { - iter++; - continue; - } - - try - { - oam_instance.getDbrootPmConfig(iter->dbRoot, pmId); - } - catch (std::runtime_error) - { - // MCOL-1116: If we are here a DBRoot is offline/missing - iter++; - continue; - } - - table->field[0]->store(oid); - table->field[1]->store(iter->segmentNum); - table->field[2]->store(iter->partitionNum); - - WriteEngine::Convertor::oid2FileName(oid, oidDirName, dbDir, iter->partitionNum, iter->segmentNum); - std::stringstream DbRootName; - DbRootName << "DBRoot" << iter->dbRoot; - std::string DbRootPath = config->getConfig("SystemConfig", DbRootName.str()); - fileSize = compressedFileSize = 0; - snprintf(fullFileName, WriteEngine::FILE_NAME_SIZE, "%s/%s", DbRootPath.c_str(), oidDirName); - - std::ostringstream oss; - oss << "pm" << pmId << "_WriteEngineServer"; - std::string client = oss.str(); - msgQueueClient = messageqcpp::MessageQueueClientPool::getInstance(oss.str()); - - if (!get_file_sizes(msgQueueClient, fullFileName, &fileSize, &compressedFileSize)) - { - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - delete emp; + int result = generate_result(oid, emp, table, thd); + if (result) return 1; - } - - table->field[3]->store(fullFileName, strlen(fullFileName), cs); - - if (fileSize > 0) - { - table->field[4]->set_notnull(); - table->field[4]->store(fileSize); - - if (compressedFileSize > 0) - { - table->field[5]->set_notnull(); - table->field[5]->store(compressedFileSize); - } - else - { - table->field[5]->set_null(); - } - } - else - { - table->field[4]->set_null(); - table->field[5]->set_null(); - } - - if (schema_table_store_record(thd, table)) - { - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - delete emp; - return 1; - } - - iter++; - messageqcpp::MessageQueueClientPool::releaseInstance(msgQueueClient); - msgQueueClient = NULL; } } diff --git a/dbcon/mysql/is_columnstore_tables.cpp b/dbcon/mysql/is_columnstore_tables.cpp index 7c52f6328..ddc21e6f5 100644 --- a/dbcon/mysql/is_columnstore_tables.cpp +++ b/dbcon/mysql/is_columnstore_tables.cpp @@ -42,22 +42,91 @@ ST_FIELD_INFO is_columnstore_tables_fields[] = {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} }; +static void get_cond_item(Item_func *item, String **table, String **db) +{ + char tmp_char[MAX_FIELD_WIDTH]; + Item_field *item_field = (Item_field*) item->arguments()[0]->real_item(); + if (strcasecmp(item_field->field_name, "table_name") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *table = item->arguments()[1]->val_str(&str_buf); + return; + } + else if (strcasecmp(item_field->field_name, "table_schema") == 0) + { + String str_buf(tmp_char, sizeof(tmp_char), system_charset_info); + *db = item->arguments()[1]->val_str(&str_buf); + return; + } +} + +static void get_cond_items(COND *cond, String **table, String **db) +{ + if (cond->type() == Item::FUNC_ITEM) + { + Item_func* fitem = (Item_func*) cond; + if (fitem->arguments()[0]->real_item()->type() == Item::FIELD_ITEM && + fitem->arguments()[1]->const_item()) + { + get_cond_item(fitem, table, db); + } + } + else if ((cond->type() == Item::COND_ITEM) && (((Item_cond*) cond)->functype() == Item_func::COND_AND_FUNC)) + { + List_iterator li(*((Item_cond*) cond)->argument_list()); + Item *item; + while ((item= li++)) + { + if (item->type() == Item::FUNC_ITEM) + { + get_cond_item((Item_func*)item, table, db); + } + else + { + get_cond_items(item, table, db); + } + } + } +} + static int is_columnstore_tables_fill(THD* thd, TABLE_LIST* tables, COND* cond) { CHARSET_INFO* cs = system_charset_info; TABLE* table = tables->table; + String *table_name = NULL; + String *db_name = NULL; boost::shared_ptr systemCatalogPtr = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id)); systemCatalogPtr->identity(execplan::CalpontSystemCatalog::FE); + if (cond) + { + get_cond_items(cond, &table_name, &db_name); + } + const std::vector< std::pair > catalog_tables = systemCatalogPtr->getTables(); for (std::vector >::const_iterator it = catalog_tables.begin(); it != catalog_tables.end(); ++it) { + if (db_name) + { + if ((*it).second.schema.compare(db_name->ptr()) != 0) + { + continue; + } + } + if (table_name) + { + if ((*it).second.table.compare(table_name->ptr()) != 0) + { + continue; + } + } + execplan::CalpontSystemCatalog::TableInfo tb_info = systemCatalogPtr->tableInfo((*it).second); std::string create_date = dataconvert::DataConvert::dateToString((*it).second.create_date); table->field[0]->store((*it).second.schema.c_str(), (*it).second.schema.length(), cs); diff --git a/oam/cloud/MCSVolumeCmds.sh b/oam/cloud/MCSVolumeCmds.sh index 291d27e44..c7a231261 100755 --- a/oam/cloud/MCSVolumeCmds.sh +++ b/oam/cloud/MCSVolumeCmds.sh @@ -202,7 +202,7 @@ detachvolume() { checkInfostatus if [ $STATUS == "detaching" ]; then retries=1 - while [ $retries -ne 60 ]; do + while [ $retries -ne 10 ]; do #retry until it's attached $AWSCLI detach-volume --volume-id $volumeName --region $Region > /tmp/volumeInfo_$volumeName 2>&1 @@ -239,7 +239,7 @@ attachvolume() { checkInfostatus if [ $STATUS == "attaching" -o $STATUS == "already-attached" ]; then retries=1 - while [ $retries -ne 60 ]; do + while [ $retries -ne 10 ]; do #check status until it's attached describevolume if [ $STATUS == "attached" ]; then diff --git a/oam/oamcpp/liboamcpp.cpp b/oam/oamcpp/liboamcpp.cpp index 92b2db3b7..2747bf1fa 100644 --- a/oam/oamcpp/liboamcpp.cpp +++ b/oam/oamcpp/liboamcpp.cpp @@ -5811,6 +5811,35 @@ bool Oam::autoMovePmDbroot(std::string residePM) exceptionControl("autoMovePmDbroot", API_INVALID_PARAMETER); } + //detach first to make sure DBS can be detach before trying to move to another pm + DBRootConfigList::iterator pt3 = residedbrootConfigList.begin(); + for( ; pt3 != residedbrootConfigList.end() ; pt3++ ) + { + int dbrootID = *pt3; + + try + { + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonDetach(dbrootlist); + } + catch (exception& ) + { + writeLog("ERROR: amazonDetach failure", LOG_TYPE_ERROR ); + + //reattach + typedef std::vector dbrootList; + dbrootList dbrootlist; + dbrootlist.push_back(itoa(dbrootID)); + + amazonAttach(residePM, dbrootlist); + + exceptionControl("autoMovePmDbroot", API_DETACH_FAILURE); + } + } + //get dbroot id for other PMs systemStorageInfo_t t; DeviceDBRootList moduledbrootlist; @@ -6330,10 +6359,8 @@ bool Oam::autoUnMovePmDbroot(std::string toPM) } if (!found) - { - writeLog("ERROR: no dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_ERROR ); - cout << "ERROR: no dbroots found in " << fileName << endl; - exceptionControl("autoUnMovePmDbroot", API_FAILURE); + writeLog("No dbroots found in ../Calpont/local/moveDbrootTransactionLog", LOG_TYPE_DEBUG ); + cout << "No dbroots found in " << fileName << endl; } oldFile.close(); @@ -7746,7 +7773,7 @@ void Oam::actionMysqlCalpont(MYSQLCALPONT_ACTION action) else return; - // check if mysql-Capont is installed + // check if mysql-Columnstore is installed string mysqlscript = InstallDir + "/mysql/mysql-Columnstore"; if (access(mysqlscript.c_str(), X_OK) != 0) @@ -10344,6 +10371,146 @@ void Oam::sendStatusUpdate(ByteStream obs, ByteStream::byte returnRequestType) /*************************************************************************** * + * Function: amazonDetach + * + * Purpose: Amazon EC2 volume deattach needed + * + ****************************************************************************/ + + void Oam::amazonDetach(dbrootList dbrootConfigList) + { + //if amazon cloud with external volumes, do the detach/attach moves + string cloud; + string DBRootStorageType; + try { + getSystemConfig("Cloud", cloud); + getSystemConfig("DBRootStorageType", DBRootStorageType); + } + catch(...) {} + + if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && + DBRootStorageType == "external" ) + { + writeLog("amazonDetach function started ", LOG_TYPE_DEBUG ); + + dbrootList::iterator pt3 = dbrootConfigList.begin(); + for( ; pt3 != dbrootConfigList.end() ; pt3++) + { + string dbrootid = *pt3; + string volumeNameID = "PMVolumeName" + dbrootid; + string volumeName = oam::UnassignedName; + string deviceNameID = "PMVolumeDeviceName" + dbrootid; + string deviceName = oam::UnassignedName; + try { + getSystemConfig( volumeNameID, volumeName); + getSystemConfig( deviceNameID, deviceName); + } + catch(...) + {} + + if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName ) + { + cout << " ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName << endl; + writeLog("ERROR: amazonDetach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR ); + exceptionControl("amazonDetach", API_INVALID_PARAMETER); + } + + //send msg to to-pm to umount volume + int returnStatus = sendMsgToProcMgr(UNMOUNT, dbrootid, FORCEFUL, ACK_YES); + if (returnStatus != API_SUCCESS) { + writeLog("ERROR: amazonDetach, umount failed on " + dbrootid, LOG_TYPE_ERROR ); + } + + if (!detachEC2Volume(volumeName)) { + cout << " ERROR: amazonDetach, detachEC2Volume failed on " + volumeName << endl; + writeLog("ERROR: amazonDetach, detachEC2Volume failed on " + volumeName , LOG_TYPE_ERROR ); + exceptionControl("amazonDetach", API_FAILURE); + } + + writeLog("amazonDetach, detachEC2Volume passed on " + volumeName , LOG_TYPE_DEBUG ); + } + } + } + + /*************************************************************************** + * + * Function: amazonAttach + * + * Purpose: Amazon EC2 volume Attach needed + * + ****************************************************************************/ + + void Oam::amazonAttach(std::string toPM, dbrootList dbrootConfigList) + { + //if amazon cloud with external volumes, do the detach/attach moves + string cloud; + string DBRootStorageType; + try { + getSystemConfig("Cloud", cloud); + getSystemConfig("DBRootStorageType", DBRootStorageType); + } + catch(...) {} + + if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") && + DBRootStorageType == "external" ) + { + writeLog("amazonAttach function started ", LOG_TYPE_DEBUG ); + + //get Instance Name for to-pm + string toInstanceName = oam::UnassignedName; + try + { + ModuleConfig moduleconfig; + getSystemConfig(toPM, moduleconfig); + HostConfigList::iterator pt1 = moduleconfig.hostConfigList.begin(); + toInstanceName = (*pt1).HostName; + } + catch(...) + {} + + if ( toInstanceName == oam::UnassignedName || toInstanceName.empty() ) + { + cout << " ERROR: amazonAttach, invalid Instance Name for " << toPM << endl; + writeLog("ERROR: amazonAttach, invalid Instance Name " + toPM, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_INVALID_PARAMETER); + } + + dbrootList::iterator pt3 = dbrootConfigList.begin(); + for( ; pt3 != dbrootConfigList.end() ; pt3++) + { + string dbrootid = *pt3; + string volumeNameID = "PMVolumeName" + dbrootid; + string volumeName = oam::UnassignedName; + string deviceNameID = "PMVolumeDeviceName" + dbrootid; + string deviceName = oam::UnassignedName; + try { + getSystemConfig( volumeNameID, volumeName); + getSystemConfig( deviceNameID, deviceName); + } + catch(...) + {} + + if ( volumeName == oam::UnassignedName || deviceName == oam::UnassignedName ) + { + cout << " ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName << endl; + writeLog("ERROR: amazonAttach, invalid configure " + volumeName + ":" + deviceName, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_INVALID_PARAMETER); + } + + if (!attachEC2Volume(volumeName, deviceName, toInstanceName)) { + cout << " ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName << endl; + writeLog("ERROR: amazonAttach, attachEC2Volume failed on " + volumeName + ":" + deviceName + ":" + toInstanceName, LOG_TYPE_ERROR ); + exceptionControl("amazonAttach", API_FAILURE); + } + + writeLog("amazonAttach, attachEC2Volume passed on " + volumeName + ":" + toPM, LOG_TYPE_DEBUG ); + } + } + } + + + /*************************************************************************** + * * Function: amazonReattach * * Purpose: Amazon EC2 volume reattach needed @@ -10445,6 +10612,7 @@ void Oam::amazonReattach(std::string toPM, dbrootList dbrootConfigList, bool att } } + /*************************************************************************** * * Function: mountDBRoot diff --git a/oam/oamcpp/liboamcpp.h b/oam/oamcpp/liboamcpp.h index 66482bc25..3b18ac490 100644 --- a/oam/oamcpp/liboamcpp.h +++ b/oam/oamcpp/liboamcpp.h @@ -229,6 +229,7 @@ enum API_STATUS API_CONN_REFUSED, API_CANCELLED, API_STILL_WORKING, + API_DETACH_FAILURE, API_MAX }; @@ -2432,6 +2433,8 @@ public: void amazonReattach(std::string toPM, dbrootList dbrootConfigList, bool attach = false); void mountDBRoot(dbrootList dbrootConfigList, bool mount = true); + void amazonDetach(dbrootList dbrootConfigList); + void amazonAttach(std::string toPM, dbrootList dbrootConfigList); /** *@brief gluster control diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index fe087f8aa..f977149c1 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -2067,6 +2067,11 @@ struct ReadThread case DICT_CREATE_EQUALITY_FILTER: { PriorityThreadPool::Job job; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); OOBPool->addJob(job); break; @@ -2075,6 +2080,11 @@ struct ReadThread case DICT_DESTROY_EQUALITY_FILTER: { PriorityThreadPool::Job job; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); OOBPool->addJob(job); break; @@ -2108,9 +2118,11 @@ struct ReadThread job.id = hdr->Hdr.UniqueID; job.weight = LOGICAL_BLOCK_RIDS; job.priority = hdr->Hdr.Priority; - - if (hdr->flags & IS_SYSCAT) - { + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; //boost::thread t(DictScanJob(outIos, bs, writeLock)); // using already-existing threads may cut latency // if it's changed back to running in an independent thread @@ -2155,9 +2167,12 @@ struct ReadThread job.id = bpps->getID(); job.weight = ismHdr->Size; job.priority = bpps->priority(); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; - if (bpps->isSysCat()) - { //boost::thread t(*bpps); // using already-existing threads may cut latency // if it's changed back to running in an independent thread @@ -2176,6 +2191,11 @@ struct ReadThread { PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->createBPP(*bs); break; @@ -2186,6 +2206,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->addJoinerToBPP(*bs); break; @@ -2199,6 +2224,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); break; } @@ -2210,6 +2240,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->destroyBPP(*bs); break; @@ -2228,6 +2263,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); break; } diff --git a/procmgr/main.cpp b/procmgr/main.cpp index e12b96f2a..ae662adcb 100644 --- a/procmgr/main.cpp +++ b/procmgr/main.cpp @@ -1688,7 +1688,7 @@ void pingDeviceThread() processManager.restartProcessType("WriteEngineServer", moduleName); //set module to enable state - processManager.enableModule(moduleName, oam::AUTO_OFFLINE); + processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true); downActiveOAMModule = false; int retry; @@ -1784,7 +1784,7 @@ void pingDeviceThread() } else //set module to enable state - processManager.enableModule(moduleName, oam::AUTO_OFFLINE); + processManager.enableModule(moduleName, oam::AUTO_OFFLINE, true); //restart module processes int retry = 0; @@ -2094,7 +2094,7 @@ void pingDeviceThread() if ( PrimaryUMModuleName == moduleName ) downPrimaryUM = true; - // if not disabled and amazon, skip + // if disabled, skip if (opState != oam::AUTO_DISABLED ) { //Log failure, issue alarm, set moduleOpState @@ -2140,7 +2140,7 @@ void pingDeviceThread() if ( ( moduleName.find("pm") == 0 && !amazon && ( DBRootStorageType != "internal") ) || ( moduleName.find("pm") == 0 && amazon && downActiveOAMModule ) || ( moduleName.find("pm") == 0 && amazon && AmazonPMFailover == "y") ) - { + string error; try { log.writeLog(__LINE__, "Call autoMovePmDbroot", LOG_TYPE_DEBUG); @@ -2157,6 +2157,23 @@ void pingDeviceThread() catch (...) { log.writeLog(__LINE__, "EXCEPTION ERROR on autoMovePmDbroot: Caught unknown exception!", LOG_TYPE_ERROR); + } + + if ( error == oam.itoa(oam::API_DETACH_FAILURE) ) + { + processManager.setModuleState(moduleName, oam::AUTO_DISABLED); + + // resume the dbrm + oam.dbrmctl("resume"); + log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG); + + //enable query stats + dbrm.setSystemQueryReady(true); + + //set query system state ready + processManager.setQuerySystemState(true); + + break; } } } diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index 2bc12c330..7646a0c3d 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -3780,7 +3780,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) restartProcessType("ExeMgr"); sleep(1); - restartProcessType("mysql"); + restartProcessType("mysqld"); restartProcessType("WriteEngineServer"); sleep(1); @@ -3799,7 +3799,7 @@ void ProcessManager::recycleProcess(string module, bool enableModule) * purpose: Clear the Disable State on a specified module * ******************************************************************************************/ -int ProcessManager::enableModule(string target, int state) +int ProcessManager::enableModule(string target, int state, bool failover) { Oam oam; ModuleConfig moduleconfig; @@ -3839,7 +3839,8 @@ int ProcessManager::enableModule(string target, int state) setStandbyModule(newStandbyModule); //set recycle process - recycleProcess(target); + if (!failover) + recycleProcess(target); log.writeLog(__LINE__, "enableModule request for " + target + " completed", LOG_TYPE_DEBUG); @@ -4647,7 +4648,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski PMwithUM = "n"; } - // If mysql is the processName, then send to modules were ExeMgr is running + // If mysqld is the processName, then send to modules were ExeMgr is running try { oam.getProcessStatus(systemprocessstatus); @@ -4658,7 +4659,7 @@ int ProcessManager::restartProcessType( std::string processName, std::string ski if ( systemprocessstatus.processstatus[i].Module == skipModule ) continue; - if ( processName == "mysql" ) + if ( processName == "mysqld" ) { { if ( systemprocessstatus.processstatus[i].ProcessName == "ExeMgr") { @@ -9813,7 +9814,7 @@ int ProcessManager::OAMParentModuleChange() { log.writeLog(__LINE__, "System Active, restart needed processes", LOG_TYPE_DEBUG); - processManager.restartProcessType("mysql"); + processManager.restartProcessType("mysqld"); processManager.restartProcessType("ExeMgr"); processManager.restartProcessType("WriteEngineServer"); processManager.reinitProcessType("DBRMWorkerNode"); @@ -11013,7 +11014,7 @@ void ProcessManager::stopProcessTypes(bool manualFlag) log.writeLog(__LINE__, "stopProcessTypes Called"); //front-end first - processManager.stopProcessType("mysql", manualFlag); + processManager.stopProcessType("mysqld", manualFlag); processManager.stopProcessType("DMLProc", manualFlag); processManager.stopProcessType("DDLProc", manualFlag); processManager.stopProcessType("ExeMgr", manualFlag); diff --git a/procmgr/processmanager.h b/procmgr/processmanager.h index fd2fe834e..9a57b38b5 100644 --- a/procmgr/processmanager.h +++ b/procmgr/processmanager.h @@ -309,7 +309,7 @@ public: /** *@brief Enable a specified module */ - int enableModule(std::string target, int state); + int enableModule(std::string target, int state, bool failover = false); /** *@brief Enable a specified module diff --git a/procmon/main.cpp b/procmon/main.cpp index d6b14abf4..41977139c 100644 --- a/procmon/main.cpp +++ b/procmon/main.cpp @@ -781,11 +781,11 @@ int main(int argc, char** argv) if ( ret != 0 ) log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR); - //mysql status monitor thread - if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM ) || - (PMwithUM == "y") ) + //mysqld status monitor thread + if ( config.moduleType() == "um" || + ( config.moduleType() == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) || + ( config.moduleType() == "pm" && PMwithUM == "y") ) { - pthread_t mysqlThread; ret = pthread_create (&mysqlThread, NULL, (void* (*)(void*)) &mysqlMonitorThread, NULL); @@ -1233,7 +1233,7 @@ static void mysqlMonitorThread(MonitorConfig config) catch (...) {} - sleep(10); + sleep(5); } } diff --git a/procmon/processmonitor.cpp b/procmon/processmonitor.cpp index 823060ad2..3da88c2df 100644 --- a/procmon/processmonitor.cpp +++ b/procmon/processmonitor.cpp @@ -484,7 +484,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "MSG RECEIVED: Stop process request on " + processName); int requestStatus = API_SUCCESS; - // check for mysql + // check for mysqld if ( processName == "mysqld" ) { try @@ -553,7 +553,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO msg >> manualFlag; log.writeLog(__LINE__, "MSG RECEIVED: Start process request on: " + processName); - // check for mysql + // check for mysqld if ( processName == "mysqld" ) { try @@ -684,7 +684,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "MSG RECEIVED: Restart process request on " + processName); int requestStatus = API_SUCCESS; - // check for mysql restart + // check for mysqld restart if ( processName == "mysqld" ) { try @@ -933,7 +933,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO log.writeLog(__LINE__, "Error running DBRM clearShm", LOG_TYPE_ERROR); } - //stop the mysql daemon + //stop the mysqld daemon try { oam.actionMysqlCalpont(MYSQL_STOP); @@ -1071,13 +1071,13 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO system(cmd.c_str()); - //start the mysql daemon + //start the mysqld daemon try { oam.actionMysqlCalpont(MYSQL_START); } catch (...) - { + { // mysqld didn't start, return with error // mysql didn't start, return with error log.writeLog(__LINE__, "STARTALL: MySQL failed to start, start-module failure", LOG_TYPE_CRITICAL); @@ -1366,7 +1366,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO //send down notification oam.sendDeviceNotification(config.moduleName(), MODULE_DOWN); - //stop the mysql daemon and then columnstore + //stop the mysqld daemon and then columnstore try { oam.actionMysqlCalpont(MYSQL_STOP); } @@ -1548,7 +1548,7 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO } } - // install mysql rpms if being reconfigured as a um + // install mysqld rpms if being reconfigured as a um if ( reconfigureModuleName.find("um") != string::npos ) { string cmd = startup::StartUp::installDir() + "/bin/post-mysqld-install >> /tmp/rpminstall"; diff --git a/utils/libmysql_client/libmysql_client.cpp b/utils/libmysql_client/libmysql_client.cpp index c12abafdf..9c67d3fa8 100644 --- a/utils/libmysql_client/libmysql_client.cpp +++ b/utils/libmysql_client/libmysql_client.cpp @@ -119,13 +119,17 @@ int LibMySQL::run(const char* query) void LibMySQL::handleMySqlError(const char* errStr, unsigned int errCode) { - ostringstream oss; - oss << errStr << "(" << errCode << ")"; - - if (errCode == (unsigned int) - 1) - oss << "(null pointer)"; + ostringstream oss; + if (mysql->getErrno()) + { + oss << errStr << " (" << mysql->getErrno() << ")"; + oss << " (" << mysql->getErrorMsg() << ")"; + } else - oss << "(" << errCode << ")"; + { + oss << errStr << " (" << errCode << ")"; + oss << " (unknown)"; + } throw logging::IDBExcept(oss.str(), logging::ERR_CROSS_ENGINE_CONNECT); diff --git a/utils/libmysql_client/libmysql_client.h b/utils/libmysql_client/libmysql_client.h index 41ee5f9de..7d3c258e7 100644 --- a/utils/libmysql_client/libmysql_client.h +++ b/utils/libmysql_client/libmysql_client.h @@ -71,6 +71,8 @@ public: { return fErrStr; } + unsigned int getErrno() { return mysql_errno(fCon); } + const char* getErrorMsg() { return mysql_error(fCon); } private: MYSQL* fCon; diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index b223cee8b..c2326a78f 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -33,6 +33,8 @@ using namespace logging; #include "prioritythreadpool.h" using namespace boost; +#include "dbcon/joblist/primitivemsg.h" + namespace threadpool { @@ -51,9 +53,9 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; - threadCounts[HIGH] = highThreads; - threadCounts[MEDIUM] = midThreads; - threadCounts[LOW] = lowThreads; + defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; + defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads; + defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads; } PriorityThreadPool::~PriorityThreadPool() @@ -68,6 +70,23 @@ void PriorityThreadPool::addJob(const Job& job, bool useLock) if (useLock) lk.lock(); + // Create any missing threads + if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) + { + threads.create_thread(ThreadHelper(this, HIGH)); + threadCounts[HIGH]++; + } + if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) + { + threads.create_thread(ThreadHelper(this, MEDIUM)); + threadCounts[MEDIUM]++; + } + if (defaultThreadCounts[LOW] != threadCounts[LOW]) + { + threads.create_thread(ThreadHelper(this, LOW)); + threadCounts[LOW]++; + } + if (job.priority > 66) jobQueues[HIGH].push_back(job); else if (job.priority > 33) @@ -113,80 +132,146 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() vector reschedule; uint32_t rescheduleCount; uint32_t queueSize; + bool running = false; - while (!_stop) + try { + while (!_stop) { - mutex::scoped_lock lk(mutex); - - queue = pickAQueue(preferredQueue); + mutex::scoped_lock lk(mutex); + queue = pickAQueue(preferredQueue); + if (jobQueues[queue].empty()) { if (jobQueues[queue].empty()) { - newJob.wait(lk); - continue; - } + newJob.wait(lk); + continue; + } - queueSize = jobQueues[queue].size(); - weight = 0; - // 3 conditions stop this thread from grabbing all jobs in the queue - // - // 1: The weight limit has been exceeded - // 2: The queue is empty - // 3: It has grabbed more than half of the jobs available & - // should leave some to the other threads + queueSize = jobQueues[queue].size(); + weight = 0; + // 3 conditions stop this thread from grabbing all jobs in the queue + // + // 1: The weight limit has been exceeded + // 2: The queue is empty + // 3: It has grabbed more than half of the jobs available & + // should leave some to the other threads - while ((weight < weightPerRun) && (!jobQueues[queue].empty()) - && (runList.size() <= queueSize / 2)) + while ((weight < weightPerRun) && (!jobQueues[queue].empty()) + && (runList.size() <= queueSize/2)) { { - runList.push_back(jobQueues[queue].front()); - jobQueues[queue].pop_front(); - weight += runList.back().weight; - } + runList.push_back(jobQueues[queue].front()); + jobQueues[queue].pop_front(); + weight += runList.back().weight; + } - lk.unlock(); + lk.unlock(); - reschedule.resize(runList.size()); - rescheduleCount = 0; - - for (i = 0; i < runList.size() && !_stop; i++) + reschedule.resize(runList.size()); + rescheduleCount = 0; + for (i = 0; i < runList.size() && !_stop; i++) { { try { - reschedule[i] = false; - reschedule[i] = (*(runList[i].functor))(); - - if (reschedule[i]) - rescheduleCount++; + reschedule[i] = false; + running = true; + reschedule[i] = (*(runList[i].functor))(); + running = false; + if (reschedule[i]) + rescheduleCount++; } - catch (std::exception& e) { - cerr << e.what() << endl; - } - } - // no real work was done, prevent intensive busy waiting - if (rescheduleCount == runList.size()) - usleep(1000); + // no real work was done, prevent intensive busy waiting + if (rescheduleCount == runList.size()) + usleep(1000); - if (rescheduleCount > 0) + if (rescheduleCount > 0) { { - lk.lock(); + lk.lock(); - for (i = 0; i < runList.size(); i++) - if (reschedule[i]) - addJob(runList[i], false); + for (i = 0; i < runList.size(); i++) + if (reschedule[i]) + addJob(runList[i], false); - if (rescheduleCount > 1) - newJob.notify_all(); - else - newJob.notify_one(); + if (rescheduleCount > 1) + newJob.notify_all(); + else + newJob.notify_one(); - lk.unlock(); + lk.unlock(); + } + + runList.clear(); } - - runList.clear(); } + catch (std::exception &ex) + { + // Log the exception and exit this thread + try + { + threadCounts[queue]--; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format( args ); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage( message ); +#endif + if (running) + sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); + } + catch (...) + { + } + } + catch (...) + { + + // Log the exception and exit this thread + try + { + threadCounts[queue]--; +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format( args ); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage( message ); +#endif + if (running) + sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); + } + catch (...) + { + } + } +} + +void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t *) &ism, sizeof(ism)); + msg.append((uint8_t *) &ph, sizeof(ph)); + + sock->write(msg); } void PriorityThreadPool::stop() diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 2a31a7725..f58af4d8d 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -36,6 +36,7 @@ #include #include #include "../winport/winport.h" +#include "primitives/primproc/umsocketselector.h" namespace threadpool { @@ -62,6 +63,9 @@ public: uint32_t weight; uint32_t priority; uint32_t id; + uint32_t uniqueID; + uint32_t stepID; + primitiveprocessor::SP_UM_IOSOCK sock; }; enum Priority @@ -112,9 +116,11 @@ private: Priority pickAQueue(Priority preference); void threadFcn(const Priority preferredQueue) throw(); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); std::list jobQueues[3]; // higher indexes = higher priority uint32_t threadCounts[3]; + uint32_t defaultThreadCounts[3]; boost::mutex mutex; boost::condition newJob; boost::thread_group threads; diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 71d0e1fbd..754a7b464 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -2246,7 +2246,6 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, if (it != aColExtsInfo.end()) //update hwm info { oldHwm = it->hwm; - } // save hwm for the old extent colWidth = colStructList[i].colWidth; @@ -2272,6 +2271,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, else return ERR_INVALID_PARAM; + } //update hwm for the new extent if (newExtent) { @@ -2285,7 +2285,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, it++; } - + colWidth = newColStructList[i].colWidth; succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio); if (succFlag) @@ -2356,6 +2356,9 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, curFbo)); } } + else + return ERR_INVALID_PARAM; + } } // If we create a new extent for this batch @@ -2376,7 +2379,8 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, curFbo)); } } - } + else + return ERR_INVALID_PARAM; } if (lbids.size() > 0) @@ -5132,7 +5136,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, bool versioning) { int rc = 0; - void* valArray; + void* valArray = NULL; string segFile; Column curCol; ColStructList::size_type totalColumn; @@ -5158,146 +5162,148 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, totalRow2 = 0; } - valArray = malloc(sizeof(uint64_t) * totalRow1); - - if (totalRow1 == 0) + // It is possible totalRow1 is zero but totalRow2 has values + if ((totalRow1 == 0) && (totalRow2 == 0)) return rc; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); - - for (i = 0; i < totalColumn; i++) + if (totalRow1) { - //@Bug 2205 Check if all rows go to the new extent - //Write the first batch - RID* firstPart = rowIdArray; - ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - - // set params - colOp->initColumn(curCol); - // need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, colStructList[i].colWidth, - colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, - colStructList[i].fCompressionType, colStructList[i].fColDbRoot, - colStructList[i].fColPartition, colStructList[i].fColSegment); - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - - while (it != aColExtsInfo.end()) + valArray = malloc(sizeof(uint64_t) * totalRow1); + for (i = 0; i < totalColumn; i++) { - if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) - break; + //@Bug 2205 Check if all rows go to the new extent + //Write the first batch + RID * firstPart = rowIdArray; + ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - it++; - } + // set params + colOp->initColumn(curCol); + // need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, colStructList[i].colWidth, + colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, + colStructList[i].fCompressionType, colStructList[i].fColDbRoot, + colStructList[i].fColPartition, colStructList[i].fColSegment); - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot = colStructList[i].fColDbRoot; - aExt.partNum = colStructList[i].fColPartition; - aExt.segNum = colStructList[i].fColSegment; - aExt.compType = colStructList[i].fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); - } + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); - rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) + break; - if (rc != NO_ERROR) - break; + it++; + } - // handling versioning - vector rangeList; + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot =colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } - if (versioning) - { - rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], - colStructList[i].colWidth, totalRow1, firstPart, rangeList); + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + + if (versioning) { - if (colStructList[i].fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], + colStructList[i].colWidth, totalRow1, firstPart, rangeList); + if (rc != NO_ERROR) { + if (rc != NO_ERROR) + { + if (colStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; } - - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - break; } - } - //totalRow1 -= totalRow2; - // have to init the size here - // nullArray = (bool*) malloc(sizeof(bool) * totalRow); - uint8_t tmp8; - uint16_t tmp16; - uint32_t tmp32; + //totalRow1 -= totalRow2; + // have to init the size here + // nullArray = (bool*) malloc(sizeof(bool) * totalRow); + uint8_t tmp8; + uint16_t tmp16; + uint32_t tmp32; - for (size_t j = 0; j < totalRow1; j++) - { - uint64_t curValue = colValueList[((totalRow1 + totalRow2) * i) + j]; - - switch (colStructList[i].colType) + for (size_t j = 0; j < totalRow1; j++) { - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - ((uint64_t*)valArray)[j] = curValue; - break; + uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j]; - case WriteEngine::WR_INT: - case WriteEngine::WR_UINT: - case WriteEngine::WR_FLOAT: - tmp32 = curValue; - ((uint32_t*)valArray)[j] = tmp32; - break; + switch (colStructList[i].colType) + { + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + ((uint64_t*)valArray)[j] = curValue; + break; - case WriteEngine::WR_ULONGLONG: - case WriteEngine::WR_LONGLONG: - case WriteEngine::WR_DOUBLE: - case WriteEngine::WR_TOKEN: - ((uint64_t*)valArray)[j] = curValue; - break; + case WriteEngine::WR_INT: + case WriteEngine::WR_UINT: + case WriteEngine::WR_FLOAT: + tmp32 = curValue; + ((uint32_t*)valArray)[j] = tmp32; + break; - case WriteEngine::WR_BYTE: - case WriteEngine::WR_UBYTE: - tmp8 = curValue; - ((uint8_t*)valArray)[j] = tmp8; - break; + case WriteEngine::WR_ULONGLONG: + case WriteEngine::WR_LONGLONG: + case WriteEngine::WR_DOUBLE: + case WriteEngine::WR_TOKEN: + ((uint64_t*)valArray)[j] = curValue; + break; - case WriteEngine::WR_SHORT: - case WriteEngine::WR_USHORT: - tmp16 = curValue; - ((uint16_t*)valArray)[j] = tmp16; - break; + case WriteEngine::WR_BYTE: + case WriteEngine::WR_UBYTE: + tmp8 = curValue; + ((uint8_t*)valArray)[j] = tmp8; + break; + + case WriteEngine::WR_SHORT: + case WriteEngine::WR_USHORT: + tmp16 = curValue; + ((uint16_t*)valArray)[j] = tmp16; + break; + } } + + +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + colOp->closeColumnFile(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + // check error + if (rc != NO_ERROR) + break; + + } // end of for (i = 0 + + if (valArray != NULL) + { + free(valArray); + valArray = NULL; } - - -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - colOp->closeColumnFile(curCol); - - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - // check error - if (rc != NO_ERROR) - break; - - } // end of for (i = 0 - - if (valArray != NULL) - { - free(valArray); - valArray = NULL; } // MCOL-1176 - Write second extent