1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

fix(CES): Server doesn't terminate MDB client connection that it did previously delivering incomplete result set

This commit is contained in:
drrtuy
2025-04-08 15:30:47 +00:00
parent 6e98ef3037
commit c9f9cf8988
2 changed files with 19 additions and 19 deletions

View File

@@ -20,7 +20,7 @@
#define PREFER_MY_CONFIG_H #define PREFER_MY_CONFIG_H
#include "crossenginestep.h" #include "crossenginestep.h"
#include <unistd.h> #include <unistd.h>
//#define NDEBUG // #define NDEBUG
#include <cassert> #include <cassert>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
@@ -251,14 +251,10 @@ T CrossEngineStep::convertValueNum(const char* str, const CalpontSystemCatalog::
case CalpontSystemCatalog::USMALLINT: rv = boost::any_cast<uint16_t>(anyVal); break; case CalpontSystemCatalog::USMALLINT: rv = boost::any_cast<uint16_t>(anyVal); break;
case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::INT: case CalpontSystemCatalog::INT: rv = boost::any_cast<int32_t>(anyVal); break;
rv = boost::any_cast<int32_t>(anyVal);
break;
case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::UINT: case CalpontSystemCatalog::UINT: rv = boost::any_cast<uint32_t>(anyVal); break;
rv = boost::any_cast<uint32_t>(anyVal);
break;
case CalpontSystemCatalog::BIGINT: rv = boost::any_cast<long long>(anyVal); break; case CalpontSystemCatalog::BIGINT: rv = boost::any_cast<long long>(anyVal); break;
@@ -304,7 +300,7 @@ T CrossEngineStep::convertValueNum(const char* str, const CalpontSystemCatalog::
{ {
if (nullFlag) if (nullFlag)
{ {
rv = joblist::CHAR8NULL; // SZ: I hate that. rv = joblist::CHAR8NULL; // SZ: I hate that.
} }
else else
{ {
@@ -389,7 +385,7 @@ void CrossEngineStep::execute()
if (ret != 0) if (ret != 0)
mysql->handleMySqlError(mysql->getError().c_str(), ret); mysql->handleMySqlError(mysql->getError().c_str(), ret);
std::string query(makeQuery()); std::string query = makeQuery();
fLogger->logMessage(logging::LOG_TYPE_INFO, "QUERY to foreign engine: " + query); fLogger->logMessage(logging::LOG_TYPE_INFO, "QUERY to foreign engine: " + query);
if (traceOn()) if (traceOn())
@@ -429,11 +425,10 @@ void CrossEngineStep::execute()
addRow(rgDataDelivered); addRow(rgDataDelivered);
} }
} }
else if (doFE1 && !doFE3) // FE in WHERE clause only else if (doFE1 && !doFE3) // FE in WHERE clause only
{ {
std::shared_ptr<uint8_t[]> rgDataFe1; // functions in where clause std::shared_ptr<uint8_t[]> rgDataFe1; // functions in where clause
Row rowFe1; // row for fe evaluation Row rowFe1; // row for fe evaluation
fRowGroupFe1.initRow(&rowFe1, true); fRowGroupFe1.initRow(&rowFe1, true);
rgDataFe1.reset(new uint8_t[rowFe1.getSize()]); rgDataFe1.reset(new uint8_t[rowFe1.getSize()]);
rowFe1.setData(rowgroup::Row::Pointer(rgDataFe1.get())); rowFe1.setData(rowgroup::Row::Pointer(rgDataFe1.get()));
@@ -480,11 +475,10 @@ void CrossEngineStep::execute()
addRow(rgDataDelivered); addRow(rgDataDelivered);
} }
} }
else if (!doFE1 && doFE3) // FE in SELECT clause only else if (!doFE1 && doFE3) // FE in SELECT clause only
{ {
std::shared_ptr<uint8_t[]> rgDataFe3; // functions in select clause std::shared_ptr<uint8_t[]> rgDataFe3; // functions in select clause
Row rowFe3; // row for fe evaluation Row rowFe3; // row for fe evaluation
fRowGroupOut.initRow(&rowFe3, true); fRowGroupOut.initRow(&rowFe3, true);
rgDataFe3.reset(new uint8_t[rowFe3.getSize()]); rgDataFe3.reset(new uint8_t[rowFe3.getSize()]);
rowFe3.setData(rowgroup::Row::Pointer(rgDataFe3.get())); rowFe3.setData(rowgroup::Row::Pointer(rgDataFe3.get()));
@@ -501,17 +495,16 @@ void CrossEngineStep::execute()
addRow(rgDataDelivered); addRow(rgDataDelivered);
} }
} }
else // FE in SELECT clause, FE join and WHERE clause else // FE in SELECT clause, FE join and WHERE clause
{ {
std::shared_ptr<uint8_t[]> rgDataFe1; // functions in where clause std::shared_ptr<uint8_t[]> rgDataFe1; // functions in where clause
Row rowFe1; // row for fe1 evaluation Row rowFe1; // row for fe1 evaluation
fRowGroupFe1.initRow(&rowFe1, true); fRowGroupFe1.initRow(&rowFe1, true);
rgDataFe1.reset(new uint8_t[rowFe1.getSize()]); rgDataFe1.reset(new uint8_t[rowFe1.getSize()]);
rowFe1.setData(rowgroup::Row::Pointer(rgDataFe1.get())); rowFe1.setData(rowgroup::Row::Pointer(rgDataFe1.get()));
std::shared_ptr<uint8_t[]> rgDataFe3; // functions in select clause std::shared_ptr<uint8_t[]> rgDataFe3; // functions in select clause
Row rowFe3; // row for fe3 evaluation Row rowFe3; // row for fe3 evaluation
fRowGroupOut.initRow(&rowFe3, true); fRowGroupOut.initRow(&rowFe3, true);
rgDataFe3.reset(new uint8_t[rowFe3.getSize()]); rgDataFe3.reset(new uint8_t[rowFe3.getSize()]);
rowFe3.setData(rowgroup::Row::Pointer(rgDataFe3.get())); rowFe3.setData(rowgroup::Row::Pointer(rgDataFe3.get()));
@@ -562,7 +555,6 @@ void CrossEngineStep::execute()
} }
} }
// INSERT_ADAPTER(fOutputDL, rgDataDelivered);
fOutputDL->insert(rgDataDelivered); fOutputDL->insert(rgDataDelivered);
fRowsRetrieved = mysql->getRowCount(); fRowsRetrieved = mysql->getRowCount();
} }
@@ -801,8 +793,7 @@ void CrossEngineStep::formatMiniStats()
{ {
ostringstream oss; ostringstream oss;
oss << "CES " oss << "CES "
<< "UM " << "UM " << tableAlias() << " "
<< tableAlias() << " "
<< "- " << "- "
<< "- " << "- "
<< "- " << "- "

View File

@@ -88,6 +88,15 @@ int LibMySQL::init(const char* h, unsigned int p, const char* u, const char* w,
ret = -1; ret = -1;
} }
static const std::string extendNetTimeoutQuery = "SET SESSION net_write_timeout = 3600";
if (mysql_real_query(fCon, extendNetTimeoutQuery.c_str(), extendNetTimeoutQuery.length()) != 0)
{
fErrStr = "fatal error setting net_write_timeout=3600 in libmysql_client lib";
ret = -1;
return ret;
}
return ret; return ret;
} }