mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
MCOL-5279 This commit fixes the corner case of the issue that can rarely happen when flow control is used to backpressure PPs that overflows EM with Primitive messages. It also adds a relevant test that makes sense in a multi-node scenario only. (#2605)
Co-authored-by: Roman Nozdrin <rnozdrin@mariadb.com>
This commit is contained in:
parent
88404f70f1
commit
09d785fc9b
@ -1,5 +1,5 @@
|
||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||
* Copyright (C) 2016-2020 MariaDB Corporation.
|
||||
* Copyright (C) 2016-2022 MariaDB Corporation.
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
@ -749,7 +749,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
||||
{
|
||||
uint64_t totalUnackedWork = 0;
|
||||
|
||||
for (uint32_t i = 0; i < pmCount; i++)
|
||||
for (int i = pmCount - 1; i >= 0; --i)
|
||||
totalUnackedWork += mqe->unackedWork[i];
|
||||
|
||||
if (totalUnackedWork == 0)
|
||||
@ -769,7 +769,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
|
||||
void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck, uint32_t* sockIndex,
|
||||
uint16_t* numToAck)
|
||||
{
|
||||
uint32_t i;
|
||||
uint32_t& nextIndex = mqe->ackSocketIndex;
|
||||
|
||||
/* Other threads can be touching mqe->unackedWork at the same time, but because of
|
||||
@ -790,7 +789,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
||||
}
|
||||
else
|
||||
{
|
||||
for (i = 0; i < pmCount; i++)
|
||||
for (int i = pmCount - 1; i >= 0; --i)
|
||||
{
|
||||
uint32_t curVal = mqe->unackedWork[nextIndex];
|
||||
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
|
||||
@ -812,8 +811,7 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
|
||||
}
|
||||
|
||||
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
|
||||
|
||||
for (i = 0; i < pmCount; i++)
|
||||
for (int i = pmCount - 1; i >= 0; --i)
|
||||
cerr << mqe->unackedWork[i] << " ";
|
||||
|
||||
cerr << " max: " << maxAck;
|
||||
@ -1051,6 +1049,9 @@ void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp::
|
||||
inMemoryEM2PPExchCV_.notify_one();
|
||||
}
|
||||
|
||||
// This routine has an unexpected side-effect on its argument's SBS, namely
|
||||
// SBS is cleared when it is sent to PP at the same host. This fact forces any
|
||||
// code uses ::writeToClient to send to a local node in the last turn.
|
||||
int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID,
|
||||
bool doInterleaving)
|
||||
{
|
||||
|
10
mysql-test/columnstore/multinode/mcol-5279.result
Normal file
10
mysql-test/columnstore/multinode/mcol-5279.result
Normal file
@ -0,0 +1,10 @@
|
||||
DROP DATABASE IF EXISTS mcol_5279;
|
||||
CREATE DATABASE mcol_5279;
|
||||
USE mcol_5279;
|
||||
CREATE TABLE `dd`( `acct_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `db_source_sk` INT(11) UNSIGNED NOT NULL) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4;
|
||||
CREATE TABLE `ff` ( `db_source_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `acct_sk` INT(11) UNSIGNED NOT NULL DEFAULT 0 ) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4;
|
||||
INSERT INTO dd SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000;
|
||||
INSERT INTO ff SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000;
|
||||
SELECT * FROM (SELECT ff.acct_id, COUNT(1) FROM ff JOIN dd ON ff.acct_sk =dd.acct_sk AND ff.db_source_sk =dd.db_source_sk GROUP BY ff.acct_id LIMIT 10)s LIMIT 0;
|
||||
acct_id COUNT(1)
|
||||
DROP DATABASE mcol_5279;
|
25
mysql-test/columnstore/multinode/mcol-5279.test
Normal file
25
mysql-test/columnstore/multinode/mcol-5279.test
Normal file
@ -0,0 +1,25 @@
|
||||
#
|
||||
# Test based on Jira MCOL-5279
|
||||
# Author: Roman Nozdrin roman.nozdrin@mariadb.com
|
||||
#
|
||||
-- source ../include/have_columnstore.inc
|
||||
|
||||
--disable_warnings
|
||||
DROP DATABASE IF EXISTS mcol_5279;
|
||||
--enable_warnings
|
||||
|
||||
CREATE DATABASE mcol_5279;
|
||||
USE mcol_5279;
|
||||
|
||||
# This test makes sense when run on a multi-node cluster.
|
||||
CREATE TABLE `dd`( `acct_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `db_source_sk` INT(11) UNSIGNED NOT NULL) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4;
|
||||
CREATE TABLE `ff` ( `db_source_sk` INT(11) UNSIGNED NOT NULL, `acct_id` VARCHAR(128) NOT NULL DEFAULT 'None', `acct_sk` INT(11) UNSIGNED NOT NULL DEFAULT 0 ) ENGINE=Columnstore DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
INSERT INTO dd SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000;
|
||||
INSERT INTO ff SELECT ROUND(RAND() * 10, 2), substring(MD5(RAND()),1,1), ROUND(RAND() * 100, 2) FROM seq_1_to_300000;
|
||||
|
||||
# The purpose is to run the statement that doesn't hang.
|
||||
SELECT * FROM (SELECT ff.acct_id, COUNT(1) FROM ff JOIN dd ON ff.acct_sk =dd.acct_sk AND ff.db_source_sk =dd.db_source_sk GROUP BY ff.acct_id LIMIT 10)s LIMIT 0;
|
||||
|
||||
# Clean UP
|
||||
DROP DATABASE mcol_5279;
|
Loading…
x
Reference in New Issue
Block a user