1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-10-30 07:25:34 +03:00
Files
mariadb-columnstore-engine/writeengine/client/we_clients.h
Serguey Zefirov a9f23729a9 feat(MCOL-6082) Cluster with read-only nodes should correctly work with DML
This patch changes logic from counting all nodes to counting only
read-write nodes when messaging about DML operations.

feat(MCOL-6082): Multiple readers of dbroots using OamCache logic

This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.

Fix build
2025-09-26 21:55:10 +04:00

191 lines
4.9 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $
//
/** @file */
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <map>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/scoped_array.hpp>
#include "bytestream.h"
//#include "we_message.h"
#include "threadsafequeue.h"
#include "rwlock_local.h"
#include "resourcemanager.h"
#define EXPORT
namespace WriteEngine
{
class WEClients
{
public:
/**
* Constructors
*/
EXPORT WEClients(int PrgmID);
EXPORT ~WEClients();
// static boost::mutex map_mutex;
EXPORT void addQueue(uint32_t key);
EXPORT void removeQueue(uint32_t key);
EXPORT void shutdownQueue(uint32_t key);
/** @brief read a Write Engine Server response
*
* Returns the next message in the inbound queue for unique ids.
* @param bs A pointer to the ByteStream to fill in.
* @note: saves a copy vs read(uint32_t, uint32_t).
*/
EXPORT void read(uint32_t key, messageqcpp::SBS&);
/** @brief write function to write to specified PM
*/
EXPORT void write(const messageqcpp::ByteStream& msg, uint32_t connection);
/** @brief write function to write to all PMs
*/
EXPORT void write_to_all(const messageqcpp::ByteStream& msg);
/** @brief Shutdown this object
*
* Closes all the connections created during Setup() and cleans up other stuff.
*/
EXPORT int Close();
/** @brief Start listening for Write Engine Server responses
*
* Starts the current thread listening on the client socket for Write Engine Server response messages. Will
* not return until busy() returns false or a zero-length response is received.
*/
EXPORT void Listen(boost::shared_ptr<messageqcpp::MessageQueueClient> client, uint32_t connIndex);
/** @brief set/unset busy flag
*
* Set or unset the busy flag so Listen() can return.
*/
EXPORT void makeBusy(bool b)
{
fBusy = b;
}
/** @brief fBusy accessor
*
*/
EXPORT bool Busy() const
{
return fBusy;
}
EXPORT void Setup();
uint64_t connectedWEServers() const
{
return fPmConnections.size();
}
/** @brief accessor
*/
uint32_t getPmCount()
{
return pmCount;
}
uint32_t getRWConnectionsCount()
{
uint32_t count = 0;
for (uint32_t i = 0; i < fPmConnections.size(); i++)
{
count += fPmConnections[i] != nullptr;
}
return count;
}
bool isConnectionReadonly(uint32_t connection);
private:
WEClients(const WEClients& weClient);
WEClients& operator=(const WEClients& weClient);
typedef std::vector<boost::thread*> ReaderList;
typedef std::map<unsigned, boost::shared_ptr<messageqcpp::MessageQueueClient> > ClientList;
// A queue of ByteStreams coming in from Write Engine Server
typedef joblist::ThreadSafeQueue<messageqcpp::SBS> WESMsgQueue;
/* To keep some state associated with the connection */
struct MQE
{
MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount)
{
unackedWork.reset(new volatile uint32_t[pmCount]);
memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
}
WESMsgQueue queue;
uint32_t ackSocketIndex;
boost::scoped_array<volatile uint32_t> unackedWork;
uint32_t pmCount;
};
// The mapping of session ids to StepMsgQueueLists
typedef std::map<unsigned, boost::shared_ptr<MQE> > MessageQueueMap;
void StartClientListener(boost::shared_ptr<messageqcpp::MessageQueueClient> cl, uint32_t connIndex);
/** @brief Add a message to the queue
*
*/
void addDataToOutput(messageqcpp::SBS, uint32_t connIndex);
int fPrgmID;
ClientList fPmConnections; // all the Write Engine servers
ReaderList fWESReader; // all the reader threads for the pm servers
MessageQueueMap
fSessionMessages; // place to put messages from the pm server to be returned by the Read method
boost::mutex fMlock; // sessionMessages mutex
std::vector<boost::shared_ptr<boost::mutex> > fWlock; // WES socket write mutexes
bool fBusy;
volatile uint32_t closingConnection;
uint32_t pmCount;
boost::mutex fOnErrMutex; // to lock function scope to reset pmconnections under error condition
boost::mutex ackLock;
public:
enum
{
DDLPROC = 0,
SPLITTER,
DMLPROC,
BATCHINSERTPROC
};
};
} // namespace WriteEngine
#undef EXPORT