1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

fix(DEC): MCOL-5602 fixing potentially endless loop in DEC (#3049)

This commit is contained in:
drrtuy
2023-12-05 17:39:24 +02:00
committed by GitHub
parent 63b032e3fd
commit 1d416bc6ed
3 changed files with 73 additions and 79 deletions

View File

@@ -225,6 +225,7 @@ class DistributedEngineComm
// A queue of ByteStreams coming in from PrimProc heading for a JobStep
typedef ThreadSafeQueue<messageqcpp::SBS> StepMsgQueue;
using AtomicSizeVec = std::vector<std::atomic<size_t>>;
/* To keep some state associated with the connection. These aren't copyable. */
struct MQE : public boost::noncopyable
@@ -235,7 +236,7 @@ class DistributedEngineComm
messageqcpp::Stats stats;
StepMsgQueue queue;
uint32_t ackSocketIndex;
boost::scoped_array<volatile uint32_t> unackedWork;
AtomicSizeVec unackedWork;
boost::scoped_array<uint32_t> interleaver;
uint32_t initialConnectionId;
uint32_t pmCount;
@@ -283,7 +284,7 @@ class DistributedEngineComm
std::mutex fMlock; // sessionMessages mutex
std::vector<std::shared_ptr<std::mutex>> fWlock; // PrimProc socket write mutexes
bool fBusy;
volatile uint32_t pmCount;
std::atomic<uint32_t> pmCount;
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
boost::mutex fSetupMutex;
@@ -305,7 +306,7 @@ class DistributedEngineComm
void sendAcks(uint32_t uniqueID, const std::vector<messageqcpp::SBS>& msgs, boost::shared_ptr<MQE> mqe,
size_t qSize);
void nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck, uint32_t* sockIndex, uint16_t* numToAck);
size_t subsMsgCounterAndRotatePM(boost::shared_ptr<MQE> mqe, const size_t maxAck, uint32_t* sockIndex);
void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr<MQE> mqe);
void doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize);
boost::mutex ackLock;