diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index b250c1c66..649bbe006 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - * Copyright (C) 2016-2020 MariaDB Corporation. + * Copyright (C) 2016-2022 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -749,7 +749,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { uint64_t totalUnackedWork = 0; - for (uint32_t i = 0; i < pmCount; i++) + for (int i = pmCount - 1; i >= 0; --i) totalUnackedWork += mqe->unackedWork[i]; if (totalUnackedWork == 0) @@ -769,7 +769,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t maxAck, uint32_t* sockIndex, uint16_t* numToAck) { - uint32_t i; uint32_t& nextIndex = mqe->ackSocketIndex; /* Other threads can be touching mqe->unackedWork at the same time, but because of @@ -790,7 +789,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max } else { - for (i = 0; i < pmCount; i++) + for (int i = pmCount - 1; i >= 0; --i) { uint32_t curVal = mqe->unackedWork[nextIndex]; uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal); @@ -812,8 +811,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max } cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! "; - - for (i = 0; i < pmCount; i++) + for (int i = pmCount - 1; i >= 0; --i) cerr << mqe->unackedWork[i] << " "; cerr << " max: " << maxAck; @@ -1051,6 +1049,9 @@ void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp:: inMemoryEM2PPExchCV_.notify_one(); } +// This routine has an unexpected side-effect on its argument's SBS, namely +// SBS is cleared when it is sent to PP at the same host. This fact forces any +// code uses ::writeToClient to send to a local node in the last turn. int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID, bool doInterleaving) { diff --git a/mysql-test/columnstore/multinode/mcol-5279.result b/mysql-test/columnstore/multinode/mcol-5279.result new file mode 100644 index 000000000..fb5c3b682 --- /dev/null +++ b/mysql-test/columnstore/multinode/mcol-5279.result @@ -0,0 +1,10 @@ +DROP DATABASE IF EXISTS mcol_5279; +CREATE DATABASE mcol_5279; +USE mcol_5279; +CREATE TABLE `dd`( `acct_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `db_source_sk` INT(11) UNSIGNED NOT NULL) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4; +CREATE TABLE `ff` ( `db_source_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `acct_sk` INT(11) UNSIGNED NOT NULL DEFAULT 0 ) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4; +INSERT INTO dd SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000; +INSERT INTO ff SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000; +SELECT * FROM (SELECT ff.acct_id, COUNT(1) FROM ff JOIN dd ON ff.acct_sk =dd.acct_sk AND ff.db_source_sk =dd.db_source_sk GROUP BY ff.acct_id LIMIT 10)s LIMIT 0; +acct_id COUNT(1) +DROP DATABASE mcol_5279; diff --git a/mysql-test/columnstore/multinode/mcol-5279.test b/mysql-test/columnstore/multinode/mcol-5279.test new file mode 100644 index 000000000..1a10b2cb7 --- /dev/null +++ b/mysql-test/columnstore/multinode/mcol-5279.test @@ -0,0 +1,25 @@ +# +# Test based on Jira MCOL-5279 +# Author: Roman Nozdrin roman.nozdrin@mariadb.com +# +-- source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol_5279; +--enable_warnings + +CREATE DATABASE mcol_5279; +USE mcol_5279; + +# This test makes sense when run on a multi-node cluster. +CREATE TABLE `dd`( `acct_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `db_source_sk` INT(11) UNSIGNED NOT NULL) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4; +CREATE TABLE `ff` ( `db_source_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `acct_sk` INT(11) UNSIGNED NOT NULL DEFAULT 0 ) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4; + +INSERT INTO dd SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000; +INSERT INTO ff SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000; + +# The purpose is to run the statement that doesn't hang. +SELECT * FROM (SELECT ff.acct_id, COUNT(1) FROM ff JOIN dd ON ff.acct_sk =dd.acct_sk AND ff.db_source_sk =dd.db_source_sk GROUP BY ff.acct_id LIMIT 10)s LIMIT 0; + +# Clean UP +DROP DATABASE mcol_5279;