1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-15 03:21:42 +03:00

Simplify PMS connection entries configuration

This commit is contained in:
Roman Nozdrin
2021-04-15 18:56:59 +00:00
parent 7e04032005
commit c9b353e975
3 changed files with 82 additions and 77 deletions

View File

@ -54,6 +54,40 @@ using namespace config;
namespace messageqcpp
{
// Aux function to try to resolve supplied identifier to fill struct addrinfo.
struct sockaddr* hostnameResolver(const std::string& dnOrIp,
const uint16_t port,
logging::Logger& aLogger,
struct sockaddr* sockAddrPtr)
{
struct addrinfo hints;
struct addrinfo *servinfo;
int rc = 0;
memset(&hints, 0, sizeof hints);
// ATM We support IPv4 only.
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if( !(rc = getaddrinfo(dnOrIp.c_str(), nullptr, &hints, &servinfo)) )
{
memset(sockAddrPtr, 0, sizeof(*sockAddrPtr));
sockaddr_in* sinp = reinterpret_cast<sockaddr_in*>(sockAddrPtr);
*sinp = *reinterpret_cast<sockaddr_in*>(servinfo->ai_addr);
sinp->sin_port = htons(port);
freeaddrinfo(servinfo);
}
else
{
string msg = "messageqcpp::hostnameResolver ";
msg.append(gai_strerror(rc));
logging::Message::Args args;
logging::LoggingID li(31);
args.add(msg);
aLogger.logMessage(logging::LOG_TYPE_ERROR, logging::M0000, args, li);
}
return sockAddrPtr;
}
void MessageQueueServer::setup(size_t blocksize, int backlog, bool syncProto)
{
@ -153,59 +187,38 @@ void MessageQueueClient::shutdown()
fClientSock.close();
}
std::pair<std::string, uint16_t> getAddressAndPort(config::Config* config, const std::string& fOtherEnd)
{
std::string otherEndDnOrIPStr = config->getConfig(fOtherEnd, "IPAddr");
std::string otherEndPortStr = config->getConfig(fOtherEnd, "Port");
uint16_t port = otherEndPortStr.length() > 0
? static_cast<uint16_t>(strtol(otherEndPortStr.c_str(), 0, 0))
: 0;
if (otherEndDnOrIPStr == "unassigned")
return {"0.0.0.0", port};
if (otherEndDnOrIPStr.empty())
return {"127.0.0.1", port};
return {otherEndDnOrIPStr, port};
}
void MessageQueueClient::setup(bool syncProto)
{
string otherEndIPStr;
string otherEndPortStr;
struct addrinfo hints, *servinfo;
int rc = 0;
otherEndIPStr = fConfig->getConfig(fOtherEnd, "IPAddr");
otherEndPortStr = fConfig->getConfig(fOtherEnd, "Port");
if (otherEndIPStr == "unassigned")
{
otherEndIPStr = "0.0.0.0";
}
if (otherEndIPStr.length() == 0) otherEndIPStr = "127.0.0.1";
if (otherEndPortStr.length() == 0 || static_cast<uint16_t>(strtol(otherEndPortStr.c_str(), 0, 0)) == 0)
auto addressAndPort = getAddressAndPort(fConfig, fOtherEnd);
if (!addressAndPort.second)
{
string msg = "MessageQueueClient::setup(): config error: Invalid/Missing Port attribute";
throw runtime_error(msg);
}
memset(&hints, 0, sizeof hints);
// ATM We support IPv4 only.
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if( !(rc = getaddrinfo(otherEndIPStr.c_str(), otherEndPortStr.c_str(), &hints, &servinfo)) )
{
memset(&fServ_addr, 0, sizeof(fServ_addr));
sockaddr_in* sinp = reinterpret_cast<sockaddr_in*>(&fServ_addr);
*sinp = *reinterpret_cast<sockaddr_in*>(servinfo->ai_addr);
freeaddrinfo(servinfo);
}
else
{
string msg = "MessageQueueClient::setup(): ";
msg.append(gai_strerror(rc));
logging::Message::Args args;
logging::LoggingID li(31);
args.add(msg);
fLogger.logMessage(logging::LOG_TYPE_ERROR, logging::M0000, args, li);
}
#ifdef SKIP_IDB_COMPRESSION
fClientSock.setSocketImpl(new InetStreamSocket());
#else
fClientSock.setSocketImpl(new CompressedInetStreamSocket());
#endif
fClientSock.syncProto(syncProto);
fClientSock.sa(&fServ_addr);
fClientSock.sa(hostnameResolver(addressAndPort.first, addressAndPort.second, fLogger, &fServ_addr));
}
MessageQueueClient::MessageQueueClient(const string& otherEnd, const string& config, bool syncProto) :
@ -226,38 +239,13 @@ MessageQueueClient::MessageQueueClient(const string& otherEnd, Config* config, b
MessageQueueClient::MessageQueueClient(const string& dnOrIp, uint16_t port, bool syncProto) :
fLogger(31), fIsAvailable(true)
{
struct addrinfo hints, *servinfo;
int rc = 0;
memset(&hints, 0, sizeof hints);
// ATM We support IPv4 only.
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if( !(rc = getaddrinfo(dnOrIp.c_str(), NULL, &hints, &servinfo)) )
{
memset(&fServ_addr, 0, sizeof(fServ_addr));
sockaddr_in* sinp = reinterpret_cast<sockaddr_in*>(&fServ_addr);
*sinp = *reinterpret_cast<sockaddr_in*>(servinfo->ai_addr);
sinp->sin_port = htons(port);
freeaddrinfo(servinfo);
}
else
{
string msg = "MessageQueueClient::MessageQueueClient(): ";
msg.append(gai_strerror(rc));
logging::Message::Args args;
logging::LoggingID li(31);
args.add(msg);
fLogger.logMessage(logging::LOG_TYPE_ERROR, logging::M0000, args, li);
}
#ifdef SKIP_IDB_COMPRESSION
fClientSock.setSocketImpl(new InetStreamSocket());
#else
fClientSock.setSocketImpl(new CompressedInetStreamSocket());
#endif
fClientSock.syncProto(syncProto);
fClientSock.sa(&fServ_addr);
fClientSock.sa(hostnameResolver(dnOrIp, port, fLogger, &fServ_addr));
}
const SBS MessageQueueClient::read(const struct timespec* timeout, bool* isTimeOut, Stats* stats) const