You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			179 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			179 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
// $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $
 | 
						|
//
 | 
						|
/** @file */
 | 
						|
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include <iostream>
 | 
						|
#include <vector>
 | 
						|
#include <queue>
 | 
						|
#include <string>
 | 
						|
#include <map>
 | 
						|
#include <boost/thread.hpp>
 | 
						|
#include <boost/thread/condition.hpp>
 | 
						|
#include <boost/scoped_array.hpp>
 | 
						|
 | 
						|
#include "bytestream.h"
 | 
						|
//#include "we_message.h"
 | 
						|
#include "threadsafequeue.h"
 | 
						|
#include "rwlock_local.h"
 | 
						|
#include "resourcemanager.h"
 | 
						|
 | 
						|
#define EXPORT
 | 
						|
 | 
						|
namespace WriteEngine
 | 
						|
{
 | 
						|
class WEClients
 | 
						|
{
 | 
						|
 public:
 | 
						|
  /**
 | 
						|
   * Constructors
 | 
						|
   */
 | 
						|
  EXPORT WEClients(int PrgmID);
 | 
						|
  EXPORT ~WEClients();
 | 
						|
 | 
						|
  // static boost::mutex map_mutex;
 | 
						|
  EXPORT void addQueue(uint32_t key);
 | 
						|
  EXPORT void removeQueue(uint32_t key);
 | 
						|
  EXPORT void shutdownQueue(uint32_t key);
 | 
						|
 | 
						|
  /** @brief read a Write Engine Server response
 | 
						|
   *
 | 
						|
   * Returns the next message in the inbound queue for unique ids.
 | 
						|
   * @param bs A pointer to the ByteStream to fill in.
 | 
						|
   * @note: saves a copy vs read(uint32_t, uint32_t).
 | 
						|
   */
 | 
						|
  EXPORT void read(uint32_t key, messageqcpp::SBS&);
 | 
						|
 | 
						|
  /** @brief write function to write to specified PM
 | 
						|
   */
 | 
						|
  EXPORT void write(const messageqcpp::ByteStream& msg, uint32_t connection);
 | 
						|
 | 
						|
  /** @brief write function to write to all PMs
 | 
						|
   */
 | 
						|
  EXPORT void write_to_all(const messageqcpp::ByteStream& msg);
 | 
						|
 | 
						|
  /** @brief Shutdown this object
 | 
						|
   *
 | 
						|
   * Closes all the connections created during Setup() and cleans up other stuff.
 | 
						|
   */
 | 
						|
  EXPORT int Close();
 | 
						|
 | 
						|
  /** @brief Start listening for Write Engine Server responses
 | 
						|
   *
 | 
						|
   * Starts the current thread listening on the client socket for Write Engine Server response messages. Will
 | 
						|
   * not return until busy() returns false or a zero-length response is received.
 | 
						|
   */
 | 
						|
  EXPORT void Listen(boost::shared_ptr<messageqcpp::MessageQueueClient> client, uint32_t connIndex);
 | 
						|
 | 
						|
  /** @brief set/unset busy flag
 | 
						|
   *
 | 
						|
   * Set or unset the busy flag so Listen() can return.
 | 
						|
   */
 | 
						|
  EXPORT void makeBusy(bool b)
 | 
						|
  {
 | 
						|
    fBusy = b;
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief fBusy accessor
 | 
						|
   *
 | 
						|
   */
 | 
						|
  EXPORT bool Busy() const
 | 
						|
  {
 | 
						|
    return fBusy;
 | 
						|
  }
 | 
						|
 | 
						|
  EXPORT void Setup();
 | 
						|
 | 
						|
  uint64_t connectedWEServers() const
 | 
						|
  {
 | 
						|
    return fPmConnections.size();
 | 
						|
  }
 | 
						|
 | 
						|
  /** @brief accessor
 | 
						|
   */
 | 
						|
  uint32_t getPmCount()
 | 
						|
  {
 | 
						|
    return pmCount;
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  WEClients(const WEClients& weClient);
 | 
						|
  WEClients& operator=(const WEClients& weClient);
 | 
						|
  typedef std::vector<boost::thread*> ReaderList;
 | 
						|
  typedef std::map<unsigned, boost::shared_ptr<messageqcpp::MessageQueueClient> > ClientList;
 | 
						|
 | 
						|
  // A queue of ByteStreams coming in from Write Engine Server
 | 
						|
  typedef joblist::ThreadSafeQueue<messageqcpp::SBS> WESMsgQueue;
 | 
						|
 | 
						|
  /* To keep some state associated with the connection */
 | 
						|
  struct MQE
 | 
						|
  {
 | 
						|
    MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount)
 | 
						|
    {
 | 
						|
      unackedWork.reset(new volatile uint32_t[pmCount]);
 | 
						|
      memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
 | 
						|
    }
 | 
						|
    WESMsgQueue queue;
 | 
						|
    uint32_t ackSocketIndex;
 | 
						|
    boost::scoped_array<volatile uint32_t> unackedWork;
 | 
						|
    uint32_t pmCount;
 | 
						|
  };
 | 
						|
 | 
						|
  // The mapping of session ids to StepMsgQueueLists
 | 
						|
  typedef std::map<unsigned, boost::shared_ptr<MQE> > MessageQueueMap;
 | 
						|
 | 
						|
  void StartClientListener(boost::shared_ptr<messageqcpp::MessageQueueClient> cl, uint32_t connIndex);
 | 
						|
 | 
						|
  /** @brief Add a message to the queue
 | 
						|
   *
 | 
						|
   */
 | 
						|
  void addDataToOutput(messageqcpp::SBS, uint32_t connIndex);
 | 
						|
 | 
						|
  int fPrgmID;
 | 
						|
 | 
						|
  ClientList fPmConnections;  // all the Write Engine servers
 | 
						|
  ReaderList fWESReader;      // all the reader threads for the pm servers
 | 
						|
  MessageQueueMap
 | 
						|
      fSessionMessages;  // place to put messages from the pm server to be returned by the Read method
 | 
						|
  boost::mutex fMlock;   // sessionMessages mutex
 | 
						|
  std::vector<boost::shared_ptr<boost::mutex> > fWlock;  // WES socket write mutexes
 | 
						|
  bool fBusy;
 | 
						|
  volatile uint32_t closingConnection;
 | 
						|
  uint32_t pmCount;
 | 
						|
  boost::mutex fOnErrMutex;  // to lock function scope to reset pmconnections under error condition
 | 
						|
 | 
						|
  boost::mutex ackLock;
 | 
						|
 | 
						|
 public:
 | 
						|
  enum
 | 
						|
  {
 | 
						|
    DDLPROC = 0,
 | 
						|
    SPLITTER,
 | 
						|
    DMLPROC,
 | 
						|
    BATCHINSERTPROC
 | 
						|
  };
 | 
						|
};
 | 
						|
 | 
						|
}  // namespace WriteEngine
 | 
						|
 | 
						|
#undef EXPORT
 | 
						|
 |