mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
* fix(aggregation, disk-based): MCOL-5689 this fixes disk-based distinct aggregation functions Previously disk-based distinct aggregation functions produced incorrect results b/c there was no finalization applied for previous generations stored on disk. * fix(aggregation, disk-based): Fix disk-based COUNT(DISTINCT ...) queries. (Case 2). (Distinct & Multi-Distinct, Single- & Multi-Threaded). * fix(aggregation, disk-based): Fix disk-based DISTINCT & GROUP BY queries. (Case 1). (Distinct & Multi-Distinct, Single- & Multi-Threaded). --------- Co-authored-by: Theresa Hradilak <theresa.hradilak@gmail.com> Co-authored-by: Roman Nozdrin <rnozdrin@mariadb.com>
461 lines
11 KiB
C++
461 lines
11 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*****************************************************************************
|
|
* $Id: sessionmanagerserver.cpp 1906 2013-06-14 19:15:32Z rdempsey $
|
|
*
|
|
****************************************************************************/
|
|
|
|
/*
|
|
* This class issues Transaction ID and keeps track of the current version ID
|
|
*/
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <cerrno>
|
|
#include <fcntl.h>
|
|
#include <mutex>
|
|
#include <unistd.h>
|
|
|
|
#include <iostream>
|
|
#include <string>
|
|
#include <stdexcept>
|
|
#include <limits>
|
|
#include <unordered_set>
|
|
using namespace std;
|
|
|
|
#include <boost/thread/mutex.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
using namespace boost;
|
|
|
|
#include "brmtypes.h"
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "configcpp.h"
|
|
#include "atomicops.h"
|
|
|
|
#define SESSIONMANAGERSERVER_DLLEXPORT
|
|
#include "sessionmanagerserver.h"
|
|
#undef SESSIONMANAGERSERVER_DLLEXPORT
|
|
|
|
#ifndef O_BINARY
|
|
#define O_BINARY 0
|
|
#endif
|
|
#ifndef O_DIRECT
|
|
#define O_DIRECT 0
|
|
#endif
|
|
#ifndef O_LARGEFILE
|
|
#define O_LARGEFILE 0
|
|
#endif
|
|
#ifndef O_NOATIME
|
|
#define O_NOATIME 0
|
|
#endif
|
|
|
|
#include "IDBDataFile.h"
|
|
#include "IDBPolicy.h"
|
|
using namespace idbdatafile;
|
|
|
|
namespace BRM
|
|
{
|
|
const uint32_t SessionManagerServer::SS_READY = 1 << 0; // Set by dmlProc one time when dmlProc is ready
|
|
const uint32_t SessionManagerServer::SS_SUSPENDED =
|
|
1 << 1; // Set by console when the system has been suspended by user.
|
|
const uint32_t SessionManagerServer::SS_SUSPEND_PENDING =
|
|
1 << 2; // Set by console when user wants to suspend, but writing is occuring.
|
|
const uint32_t SessionManagerServer::SS_SHUTDOWN_PENDING =
|
|
1 << 3; // Set by console when user wants to shutdown, but writing is occuring.
|
|
const uint32_t SessionManagerServer::SS_ROLLBACK =
|
|
1 << 4; // In combination with a PENDING flag, force a rollback as soom as possible.
|
|
const uint32_t SessionManagerServer::SS_FORCE =
|
|
1 << 5; // In combination with a PENDING flag, force a shutdown without rollback.
|
|
const uint32_t SessionManagerServer::SS_QUERY_READY =
|
|
1 << 6; // Set by ProcManager when system is ready for queries
|
|
|
|
SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0)
|
|
{
|
|
config::Config* conf;
|
|
string stmp;
|
|
const char* ctmp;
|
|
|
|
conf = config::Config::makeConfig();
|
|
|
|
try
|
|
{
|
|
stmp = conf->getConfig("SessionManager", "MaxConcurrentTransactions");
|
|
}
|
|
catch (const std::exception& e)
|
|
{
|
|
cout << e.what() << endl;
|
|
stmp.clear();
|
|
}
|
|
|
|
if (stmp != "")
|
|
{
|
|
int64_t tmp;
|
|
ctmp = stmp.c_str();
|
|
tmp = config::Config::fromText(ctmp);
|
|
|
|
if (tmp < 1)
|
|
maxTxns = 1;
|
|
else
|
|
maxTxns = static_cast<int>(tmp);
|
|
}
|
|
else
|
|
maxTxns = 1;
|
|
|
|
txnidFilename = conf->getConfig("SessionManager", "TxnIDFile");
|
|
|
|
semValue = maxTxns;
|
|
_verID = 0;
|
|
_sysCatVerID = 0;
|
|
systemState = 0;
|
|
|
|
try
|
|
{
|
|
loadState();
|
|
}
|
|
catch (...)
|
|
{
|
|
// first-time run most likely, ignore the error
|
|
}
|
|
}
|
|
|
|
SessionManagerServer::~SessionManagerServer()
|
|
{
|
|
}
|
|
void SessionManagerServer::reset()
|
|
{
|
|
mutex.try_lock();
|
|
semValue = maxTxns;
|
|
condvar.notify_all();
|
|
activeTxns.clear();
|
|
mutex.unlock();
|
|
}
|
|
|
|
void SessionManagerServer::loadState()
|
|
{
|
|
int lastTxnID;
|
|
int err;
|
|
int lastSysCatVerId;
|
|
|
|
again:
|
|
|
|
// There are now 3 pieces of info stored in the txnidfd file: last
|
|
// transaction id, last system catalog version id, and the
|
|
// system state flags. All these values are stored in shared, an
|
|
// instance of struct Overlay.
|
|
// If we fail to read a full four bytes for any value, then the
|
|
// value isn't in the file, and we start with the default.
|
|
|
|
if (IDBPolicy::exists(txnidFilename.c_str()))
|
|
{
|
|
scoped_ptr<IDBDataFile> txnidfp(IDBDataFile::open(
|
|
IDBPolicy::getType(txnidFilename.c_str(), IDBPolicy::WRITEENG), txnidFilename.c_str(), "rb", 0));
|
|
|
|
if (!txnidfp)
|
|
{
|
|
perror("SessionManagerServer(): open");
|
|
throw runtime_error("SessionManagerServer: Could not open the transaction ID file");
|
|
}
|
|
|
|
// Last transaction id
|
|
txnidfp->seek(0, SEEK_SET);
|
|
err = txnidfp->read(&lastTxnID, 4);
|
|
|
|
if (err < 0 && errno != EINTR)
|
|
{
|
|
perror("Sessionmanager::initSegment(): read");
|
|
throw runtime_error("SessionManagerServer: read failed, aborting");
|
|
}
|
|
else if (err < 0)
|
|
goto again;
|
|
else if (err == sizeof(int))
|
|
_verID = lastTxnID;
|
|
|
|
// last system catalog version id
|
|
err = txnidfp->read(&lastSysCatVerId, 4);
|
|
|
|
if (err < 0 && errno != EINTR)
|
|
{
|
|
perror("Sessionmanager::initSegment(): read");
|
|
throw runtime_error("SessionManagerServer: read failed, aborting");
|
|
}
|
|
else if (err < 0)
|
|
goto again;
|
|
else if (err == sizeof(int))
|
|
_sysCatVerID = lastSysCatVerId;
|
|
|
|
// System state. Contains flags regarding the suspend state of the system.
|
|
err = txnidfp->read(&systemState, 4);
|
|
|
|
if (err < 0 && errno == EINTR)
|
|
{
|
|
goto again;
|
|
}
|
|
else if (err == sizeof(int))
|
|
{
|
|
// Turn off the pending and force flags. They make no sense for a clean start.
|
|
// Turn off the ready flag. DMLProc will set it back on when
|
|
// initialized.
|
|
systemState &=
|
|
~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_ROLLBACK | SS_FORCE);
|
|
}
|
|
else
|
|
{
|
|
// else no problem. System state wasn't saved. Might be an upgraded system.
|
|
systemState = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Save the systemState flags of the Overlay
|
|
* segment. This is saved in the third
|
|
* word of txnid File
|
|
*/
|
|
void SessionManagerServer::saveSystemState()
|
|
{
|
|
saveSMTxnIDAndState();
|
|
}
|
|
|
|
const QueryContext SessionManagerServer::verID()
|
|
{
|
|
QueryContext ret;
|
|
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
ret.currentScn = _verID;
|
|
|
|
for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i)
|
|
ret.currentTxns->push_back(i->second);
|
|
|
|
return ret;
|
|
}
|
|
|
|
const QueryContext SessionManagerServer::sysCatVerID()
|
|
{
|
|
QueryContext ret;
|
|
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
ret.currentScn = _sysCatVerID;
|
|
|
|
for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i)
|
|
ret.currentTxns->push_back(i->second);
|
|
|
|
return ret;
|
|
}
|
|
|
|
uint32_t SessionManagerServer::newCpimportJob()
|
|
{
|
|
std::scoped_lock lk(cpimportMutex);
|
|
activeCpimportJobs.insert(cpimportJobId);
|
|
auto ret = cpimportJobId;
|
|
++cpimportJobId;
|
|
return ret;
|
|
}
|
|
|
|
void SessionManagerServer::finishCpimortJob(uint32_t jobId)
|
|
{
|
|
std::scoped_lock lk(cpimportMutex);
|
|
if (activeCpimportJobs.count(jobId))
|
|
activeCpimportJobs.erase(jobId);
|
|
}
|
|
|
|
void SessionManagerServer::clearAllCpimportJobs()
|
|
{
|
|
std::scoped_lock lk(cpimportMutex);
|
|
activeCpimportJobs.clear();
|
|
}
|
|
|
|
const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
|
|
{
|
|
TxnID ret; // ctor must set valid = false
|
|
iterator it;
|
|
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
|
|
// if it already has a txn...
|
|
it = activeTxns.find(session);
|
|
|
|
if (it != activeTxns.end())
|
|
{
|
|
ret.id = it->second;
|
|
ret.valid = true;
|
|
return ret;
|
|
}
|
|
|
|
if (!block && semValue == 0)
|
|
return ret;
|
|
else
|
|
while (semValue == 0)
|
|
condvar.wait(lk);
|
|
|
|
semValue--;
|
|
idbassert(semValue <= (uint32_t)maxTxns);
|
|
|
|
ret.id = ++_verID;
|
|
ret.valid = true;
|
|
activeTxns[session] = ret.id;
|
|
|
|
if (isDDL)
|
|
++_sysCatVerID;
|
|
|
|
saveSMTxnIDAndState();
|
|
|
|
return ret;
|
|
}
|
|
|
|
void SessionManagerServer::finishTransaction(TxnID& txn)
|
|
{
|
|
iterator it;
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
bool found = false;
|
|
|
|
if (!txn.valid)
|
|
throw invalid_argument("SessionManagerServer::finishTransaction(): transaction is invalid");
|
|
|
|
for (it = activeTxns.begin(); it != activeTxns.end();)
|
|
{
|
|
if (it->second == txn.id)
|
|
{
|
|
activeTxns.erase(it++);
|
|
txn.valid = false;
|
|
found = true;
|
|
// we could probably break at this point, but there won't be that many active txns, and,
|
|
// even though it'd be an error to have multiple entries for the same txn, we might
|
|
// well just get rid of them...
|
|
}
|
|
else
|
|
++it;
|
|
}
|
|
|
|
if (found)
|
|
{
|
|
semValue++;
|
|
idbassert(semValue <= (uint32_t)maxTxns);
|
|
condvar.notify_one();
|
|
}
|
|
else
|
|
throw invalid_argument("SessionManagerServer::finishTransaction(): transaction doesn't exist");
|
|
}
|
|
|
|
const TxnID SessionManagerServer::getTxnID(const SID session)
|
|
{
|
|
TxnID ret;
|
|
iterator it;
|
|
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
|
|
it = activeTxns.find(session);
|
|
|
|
if (it != activeTxns.end())
|
|
{
|
|
ret.id = it->second;
|
|
ret.valid = true;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
std::shared_ptr<SIDTIDEntry[]> SessionManagerServer::SIDTIDMap(int& len)
|
|
{
|
|
int j;
|
|
std::shared_ptr<SIDTIDEntry[]> ret;
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
iterator it;
|
|
|
|
ret.reset(new SIDTIDEntry[activeTxns.size()]);
|
|
|
|
len = activeTxns.size();
|
|
|
|
for (it = activeTxns.begin(), j = 0; it != activeTxns.end(); ++it, ++j)
|
|
{
|
|
ret[j].sessionid = it->first;
|
|
ret[j].txnid.id = it->second;
|
|
ret[j].txnid.valid = true;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
void SessionManagerServer::setSystemState(uint32_t state)
|
|
{
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
|
|
systemState |= state;
|
|
saveSystemState();
|
|
}
|
|
|
|
void SessionManagerServer::clearSystemState(uint32_t state)
|
|
{
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
|
|
systemState &= ~state;
|
|
saveSystemState();
|
|
}
|
|
|
|
uint32_t SessionManagerServer::getCpimportJobsCount()
|
|
{
|
|
std::scoped_lock lk(cpimportMutex);
|
|
return activeCpimportJobs.size();
|
|
}
|
|
|
|
uint32_t SessionManagerServer::getTxnCount()
|
|
{
|
|
boost::mutex::scoped_lock lk(mutex);
|
|
return activeTxns.size();
|
|
}
|
|
|
|
void SessionManagerServer::saveSMTxnIDAndState()
|
|
{
|
|
// caller holds the lock
|
|
scoped_ptr<IDBDataFile> txnidfp(IDBDataFile::open(
|
|
IDBPolicy::getType(txnidFilename.c_str(), IDBPolicy::WRITEENG), txnidFilename.c_str(), "wb", 0));
|
|
|
|
if (!txnidfp)
|
|
{
|
|
perror("SessionManagerServer(): open");
|
|
throw runtime_error("SessionManagerServer: Could not open the transaction ID file");
|
|
}
|
|
|
|
int filedata[2];
|
|
filedata[0] = _verID;
|
|
filedata[1] = _sysCatVerID;
|
|
|
|
int err = txnidfp->write(filedata, 8);
|
|
|
|
if (err < 0)
|
|
{
|
|
perror("SessionManagerServer::newTxnID(): write(verid)");
|
|
throw runtime_error("SessionManagerServer::newTxnID(): write(verid) failed");
|
|
}
|
|
|
|
uint32_t lSystemState = systemState;
|
|
// We don't save the pending flags, the force flag or the ready flags.
|
|
lSystemState &= ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_FORCE);
|
|
err = txnidfp->write(&lSystemState, sizeof(int));
|
|
|
|
if (err < 0)
|
|
{
|
|
perror("SessionManagerServer::saveSystemState(): write(systemState)");
|
|
throw runtime_error("SessionManagerServer::saveSystemState(): write(systemState) failed");
|
|
}
|
|
|
|
txnidfp->flush();
|
|
}
|
|
|
|
} // namespace BRM
|