You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-3593 Disabled full optimizer run and enabled copy-pasted simplify_joins.
Disabled 4th if block in buildOuterJoin to handle non-optimized MDB query structures. Broke getSelectPlan into pieces: processFrom, processWhere. MCOL-3593 UNION processing depends on two flags isUnion that comes as arg of getSelectPlan and unionSel that is a local variable in getSelectPlan. Modularization of getSelectPlan broke the mechanizm. This patch is supposed to partially fix it. MCOL-3593 Removed unused if condition from buildOuterJoin that allows unsupported construct subquery in ON expression. Fixed an improper if condition that ignors tableMap entries w/o condition in external_lock thus external_lock doesn't clean up when the query finishes. Fixed wrong logging for queries processed in tableMode. Now rnd_init properly sends queryText down to ExeMgr to be logged. MCOL-3593 Unused attribute FromSubQuery::fFromSub was removed. getSelectPlan has been modularized into: setExecutionParams, processFrom, processWhere. SELECT, HAVING, GROUP BY, ORDER BY still lives in getSelectPlan. Copied optimization function simplify_joins_ into our pushdown code to provide the plugin code with some rewrites from MDB it expects. The columnstore_processing_handlers_fallback session variable has been removed thus CS can't fallback from SH to partial execution paths, e.g. DH, GBH or plugin API. MCOL-3602 Moved MDB optimizer rewrites into a separate file. Add SELECT_LEX::optimize_unflattened_subqueries() call to fix IN into EXISTS rewrite for semi-JOINs with subqueries. disable_indices_for_CEJ() add index related hints to disable index access methods in Cross Engine Joins. create_SH() now flattens JOIN that has both physical tables and views. This fixes most of views related tests in the regression.
This commit is contained in:
@ -413,6 +413,23 @@ int vbin2hex(const uint8_t* p, const unsigned l, char* o)
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Table Map is used by both cond_push and table mode processing
|
||||
// Entries made by cond_push don't have csep though.
|
||||
// When
|
||||
bool onlyOneTableinTM(cal_impl_if::cal_connection_info* ci)
|
||||
{
|
||||
size_t counter = 0;
|
||||
for (auto &tableMapEntry: ci->tableMap)
|
||||
{
|
||||
if (tableMapEntry.second.csep)
|
||||
counter++;
|
||||
if (counter >= 1)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag = false)
|
||||
{
|
||||
int rc = HA_ERR_END_OF_FILE;
|
||||
@ -2441,7 +2458,9 @@ int ha_mcs_impl_rnd_init(TABLE* table)
|
||||
csep = ti.csep;
|
||||
|
||||
// for ExeMgr logging sqltext. only log once for the query although multi plans may be sent
|
||||
if (ci->tableMap.size() == 1)
|
||||
// CS adds the ti into TM in the end of rnd_init thus we log the SQL
|
||||
// only once when there is no ti with csep.
|
||||
if (onlyOneTableinTM(ci))
|
||||
{
|
||||
ti.csep->data(idb_mysql_query_str(thd));
|
||||
}
|
||||
@ -3251,7 +3270,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
||||
|
||||
aCmdLine = aCmdLine + table->s->db.str + " " + table->s->table_name.str ;
|
||||
|
||||
//cout << "aCmdLine = " << aCmdLine << endl;
|
||||
std::istringstream ss(aCmdLine);
|
||||
std::string arg;
|
||||
std::vector<std::string> v2(20, "");
|
||||
@ -3278,19 +3296,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
||||
saAttr.lpSecurityDescriptor = NULL;
|
||||
HANDLE handleList[2];
|
||||
const char* pSectionMsg;
|
||||
// Create a pipe for the child process's STDOUT.
|
||||
#if 0 // We don't need stdout to come back right now.
|
||||
pSectionMsg = "Create Stdout";
|
||||
bSuccess = CreatePipe(&ci->cpimport_stdout_Rd, &ci->cpimport_stdout_Wr, &saAttr, 0);
|
||||
|
||||
// Ensure the read handle to the pipe for STDIN is not inherited.
|
||||
if (bSuccess)
|
||||
{
|
||||
pSectionMsg = "SetHandleInformation(stdout)";
|
||||
bSuccess = SetHandleInformation(ci->cpimport_stdout_Rd, HANDLE_FLAG_INHERIT, 0);
|
||||
}
|
||||
|
||||
#endif
|
||||
bSuccess = true;
|
||||
|
||||
// Create a pipe for the child process's STDIN.
|
||||
@ -3340,10 +3345,8 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
||||
pSectionMsg = "UpdateProcThreadAttribute";
|
||||
bInitialized = true;
|
||||
handleList[0] = ci->cpimport_stdin_Rd;
|
||||
// handleList[1] = ci->cpimport_stdout_Wr;
|
||||
bSuccess = UpdateProcThreadAttribute(lpAttributeList,
|
||||
0, PROC_THREAD_ATTRIBUTE_HANDLE_LIST,
|
||||
// handleList, 2*sizeof(HANDLE), NULL, NULL);
|
||||
handleList, sizeof(HANDLE), NULL, NULL);
|
||||
}
|
||||
|
||||
@ -3365,8 +3368,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
||||
siStartInfo.lpAttributeList = lpAttributeList;
|
||||
siStartInfo.StartupInfo.hStdError = NULL;
|
||||
siStartInfo.StartupInfo.hStdOutput = NULL;
|
||||
// siStartInfo.StartupInfo.hStdError = ci->cpimport_stdout_Wr;
|
||||
// siStartInfo.StartupInfo.hStdOutput = ci->cpimport_stdout_Wr;
|
||||
siStartInfo.StartupInfo.hStdInput = ci->cpimport_stdin_Rd;
|
||||
siStartInfo.StartupInfo.dwFlags |= STARTF_USESTDHANDLES;
|
||||
// Create the child process.
|
||||
@ -3485,14 +3486,11 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
||||
{
|
||||
ci->filePtr = fdopen(ci->fdt[1], "w");
|
||||
ci->cpimport_pid = aChPid; // This is the child PID
|
||||
//cout << "Child PID is " << aChPid << endl;
|
||||
close(ci->fdt[0]); //close the READER of PARENT
|
||||
ci->fdt[0] = -1;
|
||||
// now we can send all the data thru FIFO[1], writer of PARENT
|
||||
}
|
||||
|
||||
//if(aChPid == 0)
|
||||
//cout << "******** Child finished its work ********" << endl;
|
||||
#endif
|
||||
}
|
||||
else
|
||||
@ -3500,7 +3498,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table)
|
||||
if (!ci->dmlProc)
|
||||
{
|
||||
ci->dmlProc = new MessageQueueClient("DMLProc");
|
||||
//cout << "start_bulk_insert starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3616,22 +3613,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
// @bug 2515. Check command intead of vtable state
|
||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT) && !ci->singleInsert )
|
||||
{
|
||||
|
||||
//@Bug 2438. Only load data infile calls last batch process
|
||||
/* if ( ci->isLoaddataInfile && ((thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) || (ci->useCpimport == 0))) {
|
||||
//@Bug 2829 Handle ctrl-C
|
||||
if ( thd->killed > 0 )
|
||||
abort = true;
|
||||
|
||||
if ( !ci->dmlProc )
|
||||
{
|
||||
ci->dmlProc = new MessageQueueClient("DMLProc");
|
||||
//cout << "end_bulk_insert starts a client " << ci->dmlProc << " for session " << thd->thread_id << endl;
|
||||
}
|
||||
rc = ha_mcs_impl_write_last_batch(table, *ci, abort);
|
||||
}
|
||||
else if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) ||
|
||||
} */
|
||||
if ((ci->useCpimport > 0) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) && (!ci->singleInsert) && ((ci->isLoaddataInfile) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT)) )
|
||||
@ -4076,9 +4057,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type)
|
||||
CalTableMap::iterator mapiter = ci->tableMap.find(table);
|
||||
// make sure this is a release lock (2nd) call called in
|
||||
// the table mode.
|
||||
if (mapiter != ci->tableMap.end()
|
||||
&& (mapiter->second.condInfo && mapiter->second.csep)
|
||||
&& lock_type == 2)
|
||||
if (mapiter != ci->tableMap.end() && mapiter->second.csep && lock_type == 2)
|
||||
{
|
||||
// table mode
|
||||
if (mapiter->second.conn_hndl)
|
||||
|
Reference in New Issue
Block a user