mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
As part of the charset support, a call to MY_INIT() was added at the initialization of the above processes. This call initializes the MySQL thread environment required by the charset library. However, the accompanying my_end() call required to terminate this thread environment was not added at the termination of these process, hence leaking resources. As a fix, we move the MY_INIT() calls to the Child() functions of these services and also add the missing my_end() call.
726 lines
18 KiB
C++
726 lines
18 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016-2022 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: primproc.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
#include <unistd.h>
|
|
#include <string>
|
|
#include <iostream>
|
|
#ifdef QSIZE_DEBUG
|
|
#include <iomanip>
|
|
#include <fstream>
|
|
#endif
|
|
#include <csignal>
|
|
#include <sys/time.h>
|
|
#include <sys/resource.h>
|
|
#include <tr1/unordered_set>
|
|
|
|
#include <clocale>
|
|
#include <iterator>
|
|
#include <algorithm>
|
|
#include <thread>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
using namespace boost;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "messageids.h"
|
|
using namespace logging;
|
|
|
|
#include "primproc.h"
|
|
#include "primitiveserver.h"
|
|
#include "MonitorProcMem.h"
|
|
#include "pp_logger.h"
|
|
#include "umsocketselector.h"
|
|
using namespace primitiveprocessor;
|
|
|
|
#include "archcheck.h"
|
|
using namespace archcheck;
|
|
|
|
#include "liboamcpp.h"
|
|
using namespace oam;
|
|
|
|
#include "IDBPolicy.h"
|
|
using namespace idbdatafile;
|
|
|
|
#include "cgroupconfigurator.h"
|
|
|
|
#include "crashtrace.h"
|
|
#include "installdir.h"
|
|
|
|
#include "mariadb_my_sys.h"
|
|
|
|
#include "spinlock.h"
|
|
#include "service.h"
|
|
#include "serviceexemgr.h"
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
extern uint32_t BPPCount;
|
|
extern uint32_t blocksReadAhead;
|
|
extern uint32_t defaultBufferSize;
|
|
extern uint32_t connectionsPerUM;
|
|
extern uint32_t highPriorityThreads;
|
|
extern uint32_t medPriorityThreads;
|
|
extern uint32_t lowPriorityThreads;
|
|
extern int directIOFlag;
|
|
extern int noVB;
|
|
|
|
DebugLevel gDebugLevel;
|
|
Logger* mlp;
|
|
|
|
bool isDebug(const DebugLevel level)
|
|
{
|
|
return level <= gDebugLevel;
|
|
}
|
|
|
|
} // namespace primitiveprocessor
|
|
|
|
namespace
|
|
{
|
|
int toInt(const string& val)
|
|
{
|
|
if (val.length() == 0)
|
|
return -1;
|
|
|
|
return static_cast<int>(Config::fromText(val));
|
|
}
|
|
|
|
void setupSignalHandlers()
|
|
{
|
|
struct sigaction ign;
|
|
memset(&ign, 0, sizeof(ign));
|
|
ign.sa_handler = SIG_IGN;
|
|
sigaction(SIGUSR2, &ign, 0);
|
|
|
|
sigset_t sigset;
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGPIPE);
|
|
sigaddset(&sigset, SIGUSR1);
|
|
sigaddset(&sigset, SIGUSR2);
|
|
sigprocmask(SIG_BLOCK, &sigset, 0);
|
|
|
|
}
|
|
|
|
int8_t setupCwd(Config* cf)
|
|
{
|
|
string workdir = startup::StartUp::tmpDir();
|
|
|
|
if (workdir.length() == 0)
|
|
workdir = ".";
|
|
|
|
int8_t rc = chdir(workdir.c_str());
|
|
|
|
if (rc < 0 || access(".", W_OK) != 0)
|
|
rc = chdir("/tmp");
|
|
|
|
return rc;
|
|
}
|
|
|
|
int setupResources()
|
|
{
|
|
struct rlimit rlim;
|
|
|
|
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
{
|
|
return -1;
|
|
}
|
|
|
|
rlim.rlim_cur = rlim.rlim_max = 65536;
|
|
|
|
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
{
|
|
return -2;
|
|
}
|
|
|
|
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
|
|
{
|
|
return -3;
|
|
}
|
|
|
|
if (rlim.rlim_cur != 65536)
|
|
{
|
|
return -4;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
#ifdef QSIZE_DEBUG
|
|
class QszMonThd
|
|
{
|
|
public:
|
|
QszMonThd(PrimitiveServer* psp, ofstream* qszlog) : fPsp(psp), fQszLog(qszlog)
|
|
{
|
|
}
|
|
|
|
~QszMonThd()
|
|
{
|
|
}
|
|
|
|
void operator()()
|
|
{
|
|
utils::setThreadName("QszMonThd");
|
|
for (;;)
|
|
{
|
|
uint32_t qd = fPsp->getProcessorThreadPool()->getWaiting();
|
|
|
|
if (fQszLog)
|
|
{
|
|
// Get a timestamp for output.
|
|
struct tm tm;
|
|
struct timeval tv;
|
|
|
|
gettimeofday(&tv, 0);
|
|
localtime_r(&tv.tv_sec, &tm);
|
|
|
|
ostringstream oss;
|
|
oss << setfill('0') << setw(2) << tm.tm_hour << ':' << setw(2) << tm.tm_min << ':' << setw(2)
|
|
<< tm.tm_sec << '.' << setw(4) << tv.tv_usec / 100;
|
|
|
|
*fQszLog << oss.str() << ' ' << qd << endl;
|
|
}
|
|
|
|
struct timespec req = {0, 1000 * 100}; // 100 usec
|
|
|
|
nanosleep(&req, 0);
|
|
}
|
|
}
|
|
|
|
private:
|
|
// defaults okay
|
|
// QszMonThd(const QszMonThd& rhs);
|
|
// QszMonThd& operator=(const QszMonThd& rhs);
|
|
const PrimitiveServer* fPsp;
|
|
ofstream* fQszLog;
|
|
};
|
|
#endif
|
|
|
|
#define DUMP_CACHE_CONTENTS
|
|
#ifdef DUMP_CACHE_CONTENTS
|
|
void* waitForSIGUSR1(void* p)
|
|
{
|
|
utils::setThreadName("waitForSIGUSR1");
|
|
#if defined(__LP64__) || defined(_MSC_VER)
|
|
ptrdiff_t tmp = reinterpret_cast<ptrdiff_t>(p);
|
|
int cacheCount = static_cast<int>(tmp);
|
|
#else
|
|
int cacheCount = reinterpret_cast<int>(p);
|
|
#endif
|
|
sigset_t oset;
|
|
int rec_sig;
|
|
int32_t rpt_state = 0;
|
|
sigfillset(&oset);
|
|
sigdelset(&oset, SIGUSR1);
|
|
sigdelset(&oset, SIGUSR2);
|
|
pthread_sigmask(SIG_SETMASK, &oset, 0);
|
|
|
|
sigemptyset(&oset);
|
|
sigaddset(&oset, SIGUSR1);
|
|
sigaddset(&oset, SIGUSR2);
|
|
|
|
for (;;)
|
|
{
|
|
sigwait(&oset, &rec_sig);
|
|
|
|
if (rec_sig == SIGUSR1)
|
|
{
|
|
ostringstream out;
|
|
|
|
for (int i = 0; i < cacheCount; i++)
|
|
{
|
|
BRPp[i]->formatLRUList(out);
|
|
cout << out.str() << "###" << endl;
|
|
}
|
|
}
|
|
else if (rec_sig == SIGUSR2)
|
|
{
|
|
// is reporting currently on?
|
|
rpt_state = BRPp[0]->ReportingFrequency();
|
|
|
|
if (rpt_state > 0)
|
|
rpt_state = 0; // turn reporting off
|
|
else
|
|
rpt_state = 1; // fbm will set to the value from config file
|
|
|
|
for (int i = 0; i < cacheCount; i++)
|
|
BRPp[i]->setReportingFrequency(rpt_state);
|
|
|
|
cout << "@@@" << endl;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
} // namespace
|
|
|
|
ServicePrimProc* ServicePrimProc::fInstance = nullptr;
|
|
ServicePrimProc* ServicePrimProc::instance()
|
|
{
|
|
if (!fInstance)
|
|
fInstance = new ServicePrimProc();
|
|
|
|
return fInstance;
|
|
}
|
|
|
|
int ServicePrimProc::Child()
|
|
{
|
|
Config* cf = Config::makeConfig();
|
|
|
|
setupSignalHandlers();
|
|
|
|
int err = 0;
|
|
err = setupCwd(cf);
|
|
|
|
// Initialize the charset library
|
|
MY_INIT("PrimProc");
|
|
|
|
mlp = new primitiveprocessor::Logger();
|
|
|
|
if (!m_debug)
|
|
err = setupResources();
|
|
string errMsg;
|
|
|
|
switch (err)
|
|
{
|
|
case -1:
|
|
case -3: errMsg = "Error getting file limits, please see non-root install documentation"; break;
|
|
|
|
case -2: errMsg = "Error setting file limits, please see non-root install documentation"; break;
|
|
|
|
case -4:
|
|
errMsg = "Could not install file limits to required value, please see non-root install documentation";
|
|
break;
|
|
|
|
case -5: errMsg = "Could not change into working directory"; break;
|
|
|
|
default: break;
|
|
}
|
|
|
|
if (err < 0)
|
|
{
|
|
Oam oam;
|
|
mlp->logMessage(errMsg);
|
|
cerr << errMsg << endl;
|
|
|
|
NotifyServiceInitializationFailed();
|
|
|
|
// Free up resources allocated by MY_INIT() above.
|
|
my_end(0);
|
|
|
|
return 2;
|
|
}
|
|
utils::USpaceSpinLock startupRaceLock(getStartupRaceFlag());
|
|
std::thread exeMgrThread(
|
|
[this, cf]()
|
|
{
|
|
exemgr::Opt opt;
|
|
exemgr::globServiceExeMgr = new exemgr::ServiceExeMgr(opt, cf);
|
|
// primitive delay to avoid 'not connected to PM' log error messages
|
|
// from EM. PrimitiveServer::start() releases SpinLock after sockets
|
|
// are available.
|
|
utils::USpaceSpinLock startupRaceLock(this->getStartupRaceFlag());
|
|
exemgr::globServiceExeMgr->Child();
|
|
});
|
|
|
|
int serverThreads = 1;
|
|
int serverQueueSize = 10;
|
|
int processorWeight = 8 * 1024;
|
|
int processorQueueSize = 10 * 1024;
|
|
int64_t BRPBlocksPct = 70;
|
|
uint32_t BRPBlocks = 1887437;
|
|
int BRPThreads = 16;
|
|
int cacheCount = 1;
|
|
int maxBlocksPerRead = 256; // 1MB
|
|
bool rotatingDestination = false;
|
|
uint32_t deleteBlocks = 128;
|
|
bool PTTrace = false;
|
|
string strTemp;
|
|
int priority = -1;
|
|
const string primitiveServers("PrimitiveServers");
|
|
const string jobListStr("JobList");
|
|
const string dbbc("DBBC");
|
|
const string ExtentMapStr("ExtentMap");
|
|
uint64_t extentRows = 8 * 1024 * 1024;
|
|
uint64_t MaxExtentSize = 0;
|
|
double prefetchThreshold;
|
|
uint64_t PMSmallSide = 67108864;
|
|
BPPCount = 16;
|
|
int numCores = -1;
|
|
int configNumCores = -1;
|
|
uint32_t highPriorityPercentage, medPriorityPercentage, lowPriorityPercentage;
|
|
utils::CGroupConfigurator cg;
|
|
|
|
gDebugLevel = primitiveprocessor::NONE;
|
|
|
|
int temp = toInt(cf->getConfig(primitiveServers, "ServerThreads"));
|
|
|
|
if (temp > 0)
|
|
serverThreads = temp;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "ServerQueueSize"));
|
|
|
|
if (temp > 0)
|
|
serverQueueSize = temp;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "ProcessorThreshold"));
|
|
|
|
if (temp > 0)
|
|
processorWeight = temp;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "ProcessorQueueSize"));
|
|
|
|
if (temp > 0)
|
|
processorQueueSize = temp;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "DebugLevel"));
|
|
|
|
if (temp > 0)
|
|
gDebugLevel = (DebugLevel)temp;
|
|
|
|
highPriorityPercentage = 0;
|
|
temp = toInt(cf->getConfig(primitiveServers, "HighPriorityPercentage"));
|
|
|
|
if (temp >= 0)
|
|
highPriorityPercentage = temp;
|
|
|
|
medPriorityPercentage = 0;
|
|
temp = toInt(cf->getConfig(primitiveServers, "MediumPriorityPercentage"));
|
|
|
|
if (temp >= 0)
|
|
medPriorityPercentage = temp;
|
|
|
|
lowPriorityPercentage = 0;
|
|
temp = toInt(cf->getConfig(primitiveServers, "LowPriorityPercentage"));
|
|
|
|
if (temp >= 0)
|
|
lowPriorityPercentage = temp;
|
|
|
|
temp = toInt(cf->getConfig(ExtentMapStr, "ExtentRows"));
|
|
|
|
if (temp > 0)
|
|
extentRows = temp;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "ConnectionsPerPrimProc"));
|
|
|
|
if (temp > 0)
|
|
connectionsPerUM = temp;
|
|
else
|
|
connectionsPerUM = 1;
|
|
|
|
// set to smallest extent size
|
|
// do not allow to read beyond the end of an extent
|
|
const int MaxReadAheadSz = (extentRows) / BLOCK_SIZE;
|
|
// Max size for a primitive message column buffer. Must be big enough
|
|
// to accomodate 8192 values of the widest non-dict column + 8192 RIDs + ohead.
|
|
defaultBufferSize = 150 * 1024;
|
|
|
|
// This parm controls whether we rotate through the output sockets
|
|
// when deciding where to send response messages, or whether to simply
|
|
// send the response to the socket of origin. Should normally be set
|
|
// to 'y', for install types 1 and 3.
|
|
string strVal = cf->getConfig(primitiveServers, "RotatingDestination");
|
|
// XXX: Permanently disable for now...
|
|
strVal = "N";
|
|
|
|
if ((strVal == "y") || (strVal == "Y"))
|
|
{
|
|
rotatingDestination = true;
|
|
|
|
// Disable destination rotation if UM and PM are running on same
|
|
// server, because we could accidentally end up sending DMLProc
|
|
// responses to ExeMgr and vice versa, if we rotated socket dest.
|
|
temp = toInt(cf->getConfig("Installation", "ServerTypeInstall"));
|
|
|
|
if ((temp == oam::INSTALL_COMBINE_DM_UM_PM) || (temp == oam::INSTALL_COMBINE_PM_UM))
|
|
rotatingDestination = false;
|
|
}
|
|
|
|
string strBlockPct = cf->getConfig(dbbc, "NumBlocksPct");
|
|
temp = atoi(strBlockPct.c_str());
|
|
|
|
bool absCache = false;
|
|
if (temp > 0)
|
|
{
|
|
BRPBlocksPct = temp;
|
|
/* MCOL-1847. Did the user specify this as an absolute? */
|
|
int len = strBlockPct.length();
|
|
if ((strBlockPct[len - 1] >= 'a' && strBlockPct[len - 1] <= 'z') ||
|
|
(strBlockPct[len - 1] >= 'A' && strBlockPct[len - 1] <= 'Z'))
|
|
{
|
|
absCache = true;
|
|
BRPBlocksPct = Config::fromText(strBlockPct);
|
|
}
|
|
}
|
|
if (absCache)
|
|
BRPBlocks = BRPBlocksPct / 8192;
|
|
else
|
|
BRPBlocks = ((BRPBlocksPct / 100.0) * (double)cg.getTotalMemory()) / 8192;
|
|
#if 0
|
|
temp = toInt(cf->getConfig(dbbc, "NumThreads"));
|
|
|
|
if (temp > 0)
|
|
BRPThreads = temp;
|
|
|
|
#endif
|
|
temp = toInt(cf->getConfig(dbbc, "NumCaches"));
|
|
|
|
if (temp > 0)
|
|
cacheCount = temp;
|
|
|
|
temp = toInt(cf->getConfig(dbbc, "NumDeleteBlocks"));
|
|
|
|
if (temp > 0)
|
|
deleteBlocks = temp;
|
|
|
|
if ((uint32_t)(.01 * BRPBlocks) < deleteBlocks)
|
|
deleteBlocks = (uint32_t)(.01 * BRPBlocks);
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "ColScanBufferSizeBlocks"));
|
|
|
|
if (temp > (int)MaxReadAheadSz || temp < 1)
|
|
maxBlocksPerRead = MaxReadAheadSz;
|
|
else if (temp > 0)
|
|
maxBlocksPerRead = temp;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "ColScanReadAheadBlocks"));
|
|
|
|
if (temp > (int)MaxReadAheadSz || temp < 0)
|
|
blocksReadAhead = MaxReadAheadSz;
|
|
else if (temp > 0)
|
|
{
|
|
// make sure we've got an integral factor of extent size
|
|
for (; (MaxExtentSize % temp) != 0; ++temp)
|
|
;
|
|
|
|
blocksReadAhead = temp;
|
|
}
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "PTTrace"));
|
|
|
|
if (temp > 0)
|
|
PTTrace = true;
|
|
|
|
temp = toInt(cf->getConfig(primitiveServers, "PrefetchThreshold"));
|
|
|
|
if (temp < 0 || temp > 100)
|
|
prefetchThreshold = 0;
|
|
else
|
|
prefetchThreshold = temp / 100.0;
|
|
|
|
int maxPct = 0; // disable by default
|
|
temp = toInt(cf->getConfig(primitiveServers, "MaxPct"));
|
|
|
|
if (temp >= 0)
|
|
maxPct = temp;
|
|
|
|
// We could use this same mechanism for other growing buffers.
|
|
int aggPct = 95;
|
|
temp = toInt(cf->getConfig("SystemConfig", "MemoryCheckPercent"));
|
|
|
|
if (temp >= 0)
|
|
aggPct = temp;
|
|
|
|
//...Start the thread to monitor our memory usage
|
|
new boost::thread(utils::MonitorProcMem(maxPct, aggPct, 28));
|
|
|
|
// config file priority is 40..1 (highest..lowest)
|
|
string sPriority = cf->getConfig(primitiveServers, "Priority");
|
|
|
|
if (sPriority.length() > 0)
|
|
temp = toInt(sPriority);
|
|
else
|
|
temp = 21;
|
|
|
|
// convert config file value to setpriority(2) value (-20..19, -1 is the default)
|
|
if (temp > 0)
|
|
priority = 20 - temp;
|
|
else if (temp < 0)
|
|
priority = 19;
|
|
|
|
if (priority < -20)
|
|
priority = -20;
|
|
|
|
setpriority(PRIO_PROCESS, 0, priority);
|
|
//..Instantiate UmSocketSelector singleton. Disable rotating destination
|
|
//..selection if no UM IP addresses are in the Calpo67108864LLnt.xml file.
|
|
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
|
|
|
|
if (rotatingDestination)
|
|
{
|
|
if (pUmSocketSelector->ipAddressCount() < 1)
|
|
rotatingDestination = false;
|
|
}
|
|
|
|
// See if we want to override the calculated #cores
|
|
temp = toInt(cf->getConfig(primitiveServers, "NumCores"));
|
|
|
|
if (temp > 0)
|
|
configNumCores = temp;
|
|
|
|
if (configNumCores <= 0)
|
|
{
|
|
// count the actual #cores
|
|
|
|
numCores = cg.getNumCores();
|
|
|
|
if (numCores == 0)
|
|
numCores = 8;
|
|
}
|
|
else
|
|
numCores = configNumCores;
|
|
|
|
// based on the #cores, calculate some thread parms
|
|
if (numCores > 0)
|
|
{
|
|
BRPThreads = 2 * numCores;
|
|
// there doesn't seem much benefit to having more than this, and sometimes it causes problems.
|
|
// DBBC.NumThreads can override this cap
|
|
BRPThreads = std::min(BRPThreads, 32);
|
|
}
|
|
|
|
// the default is ~10% low, 30% medium, 60% high, (where 2*cores = 100%)
|
|
if (highPriorityPercentage == 0 && medPriorityPercentage == 0 && lowPriorityPercentage == 0)
|
|
{
|
|
lowPriorityThreads = max(1, (2 * numCores) / 10);
|
|
medPriorityThreads = max(1, (2 * numCores) / 3);
|
|
highPriorityThreads = (2 * numCores) - lowPriorityThreads - medPriorityThreads;
|
|
}
|
|
else
|
|
{
|
|
uint32_t totalThreads =
|
|
(uint32_t)((lowPriorityPercentage + medPriorityPercentage + highPriorityPercentage) / 100.0 *
|
|
(2 * numCores));
|
|
|
|
if (totalThreads == 0)
|
|
totalThreads = 1;
|
|
|
|
lowPriorityThreads = (uint32_t)(lowPriorityPercentage / 100.0 * (2 * numCores));
|
|
medPriorityThreads = (uint32_t)(medPriorityPercentage / 100.0 * (2 * numCores));
|
|
highPriorityThreads = totalThreads - lowPriorityThreads - medPriorityThreads;
|
|
}
|
|
|
|
BPPCount = highPriorityThreads + medPriorityThreads + lowPriorityThreads;
|
|
|
|
// let the user override if they want
|
|
temp = toInt(cf->getConfig(primitiveServers, "BPPCount"));
|
|
|
|
if (temp > 0 && temp < (int)BPPCount)
|
|
BPPCount = temp;
|
|
|
|
temp = toInt(cf->getConfig(dbbc, "NumThreads"));
|
|
|
|
if (temp > 0)
|
|
BRPThreads = temp;
|
|
|
|
// @bug4598, switch for O_DIRECT to support gluster fs.
|
|
// directIOFlag == O_DIRECT, by default
|
|
strVal = cf->getConfig(primitiveServers, "DirectIO");
|
|
|
|
if ((strVal == "n") || (strVal == "N"))
|
|
directIOFlag = 0;
|
|
|
|
|
|
IDBPolicy::configIDBPolicy();
|
|
|
|
// no versionbuffer if using HDFS for performance reason
|
|
if (IDBPolicy::useHdfs())
|
|
noVB = 1;
|
|
|
|
cout << "Starting PrimitiveServer: st = " << serverThreads << ", sq = " << serverQueueSize
|
|
<< ", pw = " << processorWeight << ", pq = " << processorQueueSize << ", nb = " << BRPBlocks
|
|
<< ", nt = " << BRPThreads << ", nc = " << cacheCount << ", ra = " << blocksReadAhead
|
|
<< ", db = " << deleteBlocks << ", mb = " << maxBlocksPerRead << ", rd = " << rotatingDestination
|
|
<< ", tr = " << PTTrace << ", ss = " << PMSmallSide << ", bp = " << BPPCount << endl;
|
|
|
|
PrimitiveServer server(serverThreads, serverQueueSize, processorWeight, processorQueueSize,
|
|
rotatingDestination, BRPBlocks, BRPThreads, cacheCount, maxBlocksPerRead,
|
|
blocksReadAhead, deleteBlocks, PTTrace, prefetchThreshold, PMSmallSide);
|
|
|
|
#ifdef QSIZE_DEBUG
|
|
thread* qszMonThd;
|
|
|
|
if (gDebugLevel >= STATS)
|
|
{
|
|
ofstream* qszLog = new ofstream("/var/log/mariadb/columnstore/trace/ppqsz.dat");
|
|
|
|
if (!qszLog->good())
|
|
{
|
|
qszLog->close();
|
|
delete qszLog;
|
|
qszLog = 0;
|
|
}
|
|
|
|
qszMonThd = new thread(QszMonThd(&server, qszLog));
|
|
}
|
|
|
|
#endif
|
|
|
|
#ifdef DUMP_CACHE_CONTENTS
|
|
{
|
|
// Need to use pthreads API here...
|
|
pthread_t thd1;
|
|
pthread_attr_t attr1;
|
|
pthread_attr_init(&attr1);
|
|
pthread_attr_setdetachstate(&attr1, PTHREAD_CREATE_DETACHED);
|
|
pthread_create(&thd1, &attr1, waitForSIGUSR1, reinterpret_cast<void*>(cacheCount));
|
|
}
|
|
#endif
|
|
|
|
primServerThreadPool = server.getProcessorThreadPool();
|
|
|
|
server.start(this, startupRaceLock);
|
|
|
|
cerr << "server.start() exited!" << endl;
|
|
|
|
// Free up resources allocated by MY_INIT() above.
|
|
my_end(0);
|
|
|
|
return 1;
|
|
}
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
if (checkArchitecture() != arcitecture::SSE4_2 && checkArchitecture() != arcitecture::ASIMD)
|
|
{
|
|
std::cerr << "Unsupported CPU architecture. ARM Advanced SIMD or x86_64 SSE4.2 required; aborting. \n";
|
|
return 1;
|
|
}
|
|
Opt opt(argc, argv);
|
|
// Set locale language
|
|
setlocale(LC_ALL, "");
|
|
setlocale(LC_NUMERIC, "C");
|
|
// This is unset due to the way we start it
|
|
program_invocation_short_name = const_cast<char*>("PrimProc");
|
|
|
|
ServicePrimProc::instance()->setOpt(opt);
|
|
return ServicePrimProc::instance()->Run();
|
|
}
|