1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge branch 'develop' into MCOL-3536

This commit is contained in:
David Hall
2020-05-28 14:20:32 -05:00
28 changed files with 300 additions and 256 deletions

View File

@@ -1,5 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016 MariaDB Corporation.
* Copyright (C) 2016-2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@@ -36,6 +36,8 @@
#include <ctime>
#include <algorithm>
#include <unistd.h>
#include <chrono>
#include <thread>
using namespace std;
#include <boost/scoped_array.hpp>
@@ -240,11 +242,10 @@ void DistributedEngineComm::Setup()
newClients.clear();
newLocks.clear();
throttleThreshold = fRm->getDECThrottleThreshold();
uint32_t newPmCount = fRm->getPsCount();
int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1);
throttleThreshold = fRm->getDECThrottleThreshold();
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
unsigned numConnections = newPmCount * cpp;
unsigned numConnections = getNumConnections();
oam::Oam oam;
ModuleTypeConfig moduletypeconfig;
@@ -386,51 +387,59 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out.
/* boost::mutex::scoped_lock lk(fMlock);
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
if (fIsExeMgr)
{
//std::cout << "WARNING: DEC READ 0 LENGTH BS FROM "
// << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl;
decltype(pmCount) originalPMCount = pmCount;
// Re-establish if a remote PM restarted.
std::this_thread::sleep_for(std::chrono::seconds(3));
Setup();
if (originalPMCount != pmCount)
{
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
}
// reset the pmconnection vector
ClientList tempConns;
/*
// reset the pmconnection vector
ClientList tempConns;
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
}
{
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
}
if (tempConns.size() == fPmConnections.size()) return;
if (tempConns.size() == fPmConnections.size()) return;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl;
*/
// send alarm & log it
ALARMManager alarmMgr;
string alarmItem = client->addr2String();
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
// }
// log it
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
*/
}
return;
}
@@ -999,22 +1008,22 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
catch (...)
{
// @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw excpetion
/* SBS sbs;
lk.lock();
//cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
// by pushing 0 size bytestream to messagequeue and throw exception
SBS sbs;
lk.lock();
//std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
/*
// reconfig the connection array
ClientList tempConns;
{
@@ -1033,7 +1042,6 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
}
*/
// send alarm
ALARMManager alarmMgr;
string alarmItem("UNKNOWN");
@@ -1045,6 +1053,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
*/
throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
}
}

View File

@@ -197,6 +197,12 @@ public:
return pmCount;
}
unsigned getNumConnections() const
{
unsigned cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1);
return fRm->getPsCount() * cpp;
}
messageqcpp::Stats getNetworkStats(uint32_t uniqueID);
friend class ::TestDistributedEngineComm;

View File

@@ -294,24 +294,10 @@ extern "C"
try
{
if (getenv("SKIP_OAM_INIT"))
if (dbrm.getSystemReady()
&& dbrm.getSystemQueryReady())
{
if (dbrm.getSystemReady()
&& dbrm.getSystemQueryReady())
{
return 1;
}
}
else
{
oam.getSystemStatus(systemstatus);
if (systemstatus.SystemOpState == ACTIVE
&& dbrm.getSystemReady()
&& dbrm.getSystemQueryReady())
{
return 1;
}
return 1;
}
}
catch (...)

View File

@@ -891,7 +891,11 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
if (current_thd->variables.sql_mode & MODE_PAD_CHAR_TO_FULL_LENGTH)
{
// Pad to the full length of the field
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth);
if (ci.utf8)
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth * 3);
else
escape.assign((char*)buf, ci.columnTypes[colpos].colWidth);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
@@ -904,8 +908,10 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
bitmap_set_bit(table->read_set, field->field_index);
String attribute;
field->val_str(&attribute);
escape.assign((char*)buf, attribute.length());
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(),
escape.c_str(), ci.enclosed_by, ci.delimiter);
}
@@ -1777,32 +1783,35 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
// MCOL-4005 Note that we don't handle nulls as a special
// case here as we do for other datatypes, the below works
// as expected for nulls.
uint32_t dataLength = 0;
uintptr_t* dataptr;
uchar* ucharptr;
uint colWidthInBytes = (ci.utf8 ?
ci.columnTypes[colpos].colWidth * 3: ci.columnTypes[colpos].colWidth);
if (ci.columnTypes[colpos].colWidth < 256)
if (colWidthInBytes < 256)
{
dataLength = *(uint8_t*) buf;
buf++;
}
else if (ci.columnTypes[colpos].colWidth < 65536)
else if (colWidthInBytes < 65536)
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
buf += 2;
}
else if (ci.columnTypes[colpos].colWidth < 16777216)
else if (colWidthInBytes < 16777216)
{
dataLength = *(uint16_t*) buf;
buf = buf + 2 ;
if (*(uint8_t*)buf)
dataLength += 256*256*(*(uint8_t*)buf) ;
buf++;
dataLength = *(uint16_t*) buf;
dataLength |= ((int) buf[2]) << 16;
buf += 3;
}
else
{
dataLength = *(uint32_t*) buf;
buf = buf + 4 ;
buf += 4;
}
// buf contains pointer to blob, for example:
@@ -1826,7 +1835,9 @@ int ha_mcs_impl_write_batch_row_(const uchar* buf, TABLE* table, cal_impl_if::ca
else
{
// TEXT Column
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, dataLength, ucharptr, ci.enclosed_by, ci.delimiter);
escape.assign((char*)ucharptr, dataLength);
boost::replace_all(escape, "\\", "\\\\");
fprintf(ci.filePtr, "%c%.*s%c%c", ci.enclosed_by, (int)escape.length(), escape.c_str(), ci.enclosed_by, ci.delimiter);
}
break;

View File

@@ -1308,7 +1308,7 @@ uint32_t buildOuterJoin(gp_walk_info& gwi, SELECT_LEX& select_lex)
// View is already processed in view::transform
// @bug5319. view is sometimes treated as derived table and
// fromSub::transform does not build outer join filters.
if (!table_ptr->derived && table_ptr->view)
if (!table_ptr->derived && table_ptr->view && !gwi.subQuery)
continue;
CalpontSystemCatalog:: TableAliasName tan = make_aliasview(

View File

@@ -17,7 +17,7 @@ for arg in "$@"; do
done
# DELETE libcalmysql.so entries first as they are in ha_columnstore.so in 1.4.2 onwards
mysql --force --user=root mysql 2> ${tmpdir}/mysql_install.log <<EOD
su -s /bin/sh -c 'mysql' mysql 2> ${tmpdir}/mysql_install.log <<EOD
DELETE FROM mysql.func WHERE dl='libcalmysql.so';
INSERT INTO mysql.func VALUES ('calgetstats',0,'ha_columnstore.so','function');
INSERT INTO mysql.func VALUES ('calsettrace',2,'ha_columnstore.so','function');
@@ -110,9 +110,10 @@ CREATE TABLE IF NOT EXISTS infinidb_querystats.priority
insert ignore into infinidb_querystats.priority values ('High', 100),('Medium', 66), ('Low', 33);
EOD
mysql --user=root mysql 2>/dev/null <@ENGINE_SUPPORTDIR@/syscatalog_mysql.sql
mysql --user=root mysql 2>/dev/null <@ENGINE_SUPPORTDIR@/calsetuserpriority.sql
mysql --user=root mysql 2>/dev/null <@ENGINE_SUPPORTDIR@/calremoveuserpriority.sql
mysql --user=root mysql 2>/dev/null <@ENGINE_SUPPORTDIR@/calshowprocesslist.sql
mysql --user=root mysql 2>/dev/null <@ENGINE_SUPPORTDIR@/columnstore_info.sql
su -s /bin/sh -c 'mysql <@ENGINE_SUPPORTDIR@/syscatalog_mysql.sql' mysql 2>/dev/null
su -s /bin/sh -c 'mysql <@ENGINE_SUPPORTDIR@/calsetuserpriority.sql' mysql 2>/dev/null
su -s /bin/sh -c 'mysql <@ENGINE_SUPPORTDIR@/calremoveuserpriority.sql' mysql 2>/dev/null
su -s /bin/sh -c 'mysql <@ENGINE_SUPPORTDIR@/calshowprocesslist.sql' mysql 2>/dev/null
su -s /bin/sh -c 'mysql <@ENGINE_SUPPORTDIR@/columnstore_info.sql' mysql 2>/dev/null