1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-24 22:42:05 +03:00
Files
mariadb-columnstore-engine/versioning/BRM/sessionmanagerserver.h
Denis Khalikov 3fcb9b66f5 MCOL-5555 Add support for startreadonly command.
This patch adds support for `startreadonly` command which waits
until all active cpimport jobs are done and then puts controller node to readonly
mode.
2023-10-16 16:11:12 +03:00

301 lines
10 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/******************************************************************************
* $Id: sessionmanagerserver.h 1906 2013-06-14 19:15:32Z rdempsey $
*
*****************************************************************************/
/** @file
* class SessionManagerServer interface
*/
#pragma once
#include <map>
#include <condition_variable>
#include <unordered_set>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include "calpontsystemcatalog.h"
#include "brmtypes.h"
#include "atomicops.h"
#define EXPORT
namespace BRM
{
/** @brief Issues transaction IDs and keeps track of the current system-wide version ID.
*
* This class's primary purpose is to keep track of the current system-wide version ID and issue transaction
* IDs. It can be used simultaneously by multiple threads and processes.
*
* It uses system-wide semaphores and shared memory segments for IPC.
* It allocates both if they don't already exist, but it only deallocates
* the shmseg (saves it to a file so state isn't lost). It may be a waste of
* CPU time to deallocate either while the system is running. It may be more
* appropriate to do that in the DB shut down script.
*
* Note: Added a macro 'DESTROYSHMSEG' which enables/disables the code to load/save
* and deallocate the shared memory segment.
*
* This class uses 3 parameters from the Columnstore.xml file:
* SessionManager/MaxConcurrentTransactions: defines how many active transactions the
* system should support. When a new request comes in and there are already
* MaxConcurrentTransactions active, the new request blocks by default. The
* default value is 1000.
* SessionManager/SharedMemoryTmpFile: the file to store the shared memory segment
* data in between invocations if DESTROYSHMSEG is defined below. The
* default is /tmp/ColumnstoreShm.
* SessionManager/TxnIDFile: the file to store the last transaction ID issued
*/
/*
* Define DESTROYSHMSEG if the SM should deallocate the shared memory segment
* after the last reference has died. This also enables the load/save
* operations. If it is undefined, the segment is never deallocated
* by SM, and is therefore more efficient between instances.
* NOTE: It seems that valgrind doesn't work well with shared memory segments.
* Specifically, when the code executes shmctl(IPC_RMID), which normally means
* "destroy the segment when refcount == 0", the segment is destroyed
* immediately, causing all subsequent references to fail. This only affects
* 'leakcheck'.
*/
//#define DESTROYSHMSEG
class SessionManagerServer
{
public:
/** @brief SID = Session ID */
typedef uint32_t SID;
// State flags. These bit values are stored in Overlay::state and reflect the current state of the system
static const uint32_t SS_READY; /// bit 0 => Set by dmlProc one time when dmlProc is ready
static const uint32_t SS_SUSPENDED; /// bit 1 => Set by console when the system has been suspended by user.
static const uint32_t
SS_SUSPEND_PENDING; /// bit 2 => Set by console when user wants to suspend, but writing is occuring.
static const uint32_t
SS_SHUTDOWN_PENDING; /// bit 3 => Set by console when user wants to shutdown, but writing is occuring.
static const uint32_t
SS_ROLLBACK; /// bit 4 => In combination with a PENDING flag, force a rollback as soom as possible.
static const uint32_t
SS_FORCE; /// bit 5 => In combination with a PENDING flag, force a shutdown without rollback.
static const uint32_t SS_QUERY_READY; /// bit 6 => Set by ProcManager when system is ready for queries
/** @brief Constructor
*
* This sets up the shared memory segment & semaphores used by this
* instance. No additional set-up calls are necessary.
* @note throws ios_base::failure on file IO error, runtime_error for
* other types of errors. It might be worthwhile to define additional
* exceptions for all the different types of failures that can happen
* here.
*/
EXPORT SessionManagerServer();
/** @brief Destructor
*
* This detaches the shared memory segment. If DESTROYSHMSEG is defined and this
* is the last reference to it, it will be saved to disk and destroyed.
* It does not destroy the semaphores. Those persist until the system
* is shut down.
*/
virtual ~SessionManagerServer();
/** @brief Gets the current version ID
*
* Gets the current version ID.
*/
EXPORT const QueryContext verID();
/** @brief Gets the current version ID
*
* Gets the current version ID.
*/
EXPORT const QueryContext sysCatVerID();
/** @brief Gets a new Transaction ID
*
* Makes a new Transaction ID, associates it with the given session
* and increments the system-wide version ID
* @note This blocks until (# active transactions) \< MaxTxns unless block == false
* @note Throws runtime_error on semaphore-related error
* @note This will always return a valid TxnID unless block == false, in which case
* it will return a valid TxnID iff it doesn't have to block.
* @param session The session ID to associate with the transaction ID
* @param block If true, this will block until there is an available slot, otherwise
* it will return a TxnID marked invalid, signifying that it could not start a new transaction
* @return The new transaction ID.
*/
EXPORT const TxnID newTxnID(const SID session, bool block = true, bool isDDL = false);
// Adds a new job into `active` cpimport job list and return id of that job.
EXPORT uint32_t newCpimportJob();
// Removes the given `jobId` from `active` cpimort job list.
EXPORT void finishCpimortJob(uint32_t jobId);
// Clears all active cpimport jobs.
EXPORT void clearAllCpimportJobs();
/** @brief Record that a transaction has been committed
*
* Record that a transaction has been committed.
* @note Throws runtime_error on a semaphore-related error, invalid_argument
* when txnid can't be found
* @param txnid The committed transaction ID. This is marked invalid
* on return.
*/
void committed(TxnID& txn)
{
finishTransaction(txn);
}
/** @brief Record that a transaction has been rolled back
*
* Record that a transaction has been rolled back.
* @note Throws runtime_error on a semaphore-related error, invalid_argument
* when txnid can't be found
* @param txnid The rolled back transaction ID. This is marked invalid
* on return.
*/
void rolledback(TxnID& txn)
{
finishTransaction(txn);
}
/** @brief Gets the transaction ID associated with a given session ID
*
* Gets the transaction ID associated with a given session ID.
* @note Throws runtime_error on a semaphore-related error
* @param session The session ID
* @return A valid transaction ID if there's an association; an invalid
* one if there isn't.
*/
EXPORT const TxnID getTxnID(const SID session);
/** @brief Gets an array containing all active SID-TID associations
*
* Gets an array containing the SID-TID associations of all active
* transactions. This is intended to be used by another object
* that can determine which sessions are still live and which
* transactions need to go away.
* @note Throws runtime_error on a semaphore-related error
* @param len (out) the length of the array
* @return A pointer to the array. Note: The caller is responsible for
* deallocating it. Use delete[].
*/
EXPORT std::shared_ptr<SIDTIDEntry[]> SIDTIDMap(int& len);
/**
* get a unique 32-bit number
*/
uint32_t getUnique32()
{
return atomicops::atomicInc(&unique32);
}
/**
* get a unique 64-bit number
*/
uint64_t getUnique64()
{
return atomicops::atomicInc(&unique64);
}
/** @brief Resets the semaphores to their original state. For testing only.
*
* Resets the semaphores to their original state. For testing only.
*/
EXPORT void reset();
/**
* get the Txn ID filename
*/
std::string getTxnIDFilename() const
{
return txnidFilename;
}
/** @brief set system state info
*
* Sets the bits on in Overlay::systemState that are found on in
* state. That is Overlay::systemState | state.
*/
EXPORT void setSystemState(uint32_t state);
/** @brief set system state info
*
* Clears the bits on in Overlay::systemState that are found on
* in state. That is Overlay::systemState & ~state.
*/
EXPORT void clearSystemState(uint32_t state);
/** @brief get system state info
*
* Returns the Overlay::systemState flags
*/
void getSystemState(uint32_t& state)
{
state = systemState;
}
/**
* get the number of active txn's
*/
EXPORT uint32_t getTxnCount();
EXPORT uint32_t getCpimportJobsCount();
private:
SessionManagerServer(const SessionManagerServer&);
SessionManagerServer& operator=(const SessionManagerServer&);
void loadState();
void saveSystemState();
void finishTransaction(TxnID& txn);
void saveSMTxnIDAndState();
volatile uint32_t unique32;
volatile uint64_t unique64;
int maxTxns; // the maximum number of concurrent transactions
std::string txnidFilename;
execplan::CalpontSystemCatalog::SCN _verID;
execplan::CalpontSystemCatalog::SCN _sysCatVerID;
uint32_t systemState;
std::map<SID, execplan::CalpontSystemCatalog::SCN> activeTxns;
typedef std::map<SID, execplan::CalpontSystemCatalog::SCN>::iterator iterator;
boost::mutex mutex;
boost::condition_variable condvar; // used to synthesize a semaphore
uint32_t semValue;
std::unordered_set<uint32_t> activeCpimportJobs;
uint32_t cpimportJobId{0};
std::mutex cpimportMutex;
};
} // namespace BRM
#undef EXPORT