1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-1052. init_scan() initial implementation.

This commit is contained in:
Roman Nozdrin
2018-03-22 17:12:56 +03:00
parent a03ecb7a8e
commit cff504c8bf
4 changed files with 246 additions and 19 deletions

View File

@ -4957,5 +4957,178 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos)
return ER_INTERNAL_ERROR;
}
/*@brief ha_calpont_impl_group_by_init - Get data for MariaDB group_by
pushdown handler
*/
/***********************************************************
* DESCRIPTION:
* Prepare data for group_by_handler::next_row() calls.
* PARAMETERS:
* query - Query structure, that describes the pushdown query.
* table - TABLE pointer The table to save the result set in.
* RETURN:
* 0 if success
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_calpont_impl_group_by_init(Query* query, TABLE* table)
{
//first_row= true;
return(0);
}
/*@brief ha_calpont_impl_group_by_next - Return result set for MariaDB group_by
pushdown handler
*/
/***********************************************************
* DESCRIPTION:
* Return a result record for each group_by_handler::next_row() call.
* PARAMETERS:
* query - Query structure, that describes the pushdown query.
* table - TABLE pointer The table to save the result set in.
* RETURN:
* 0 if success
* HA_ERR_END_OF_FILE if the record set has come to an end
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_calpont_impl_group_by_next(Query* query, TABLE* table)
{
//if (!first_row)
//return(HA_ERR_END_OF_FILE);
//first_row = 0;
//Field *field = *(table->field);
//field->store(5LL, 1);
//field->set_notnull();
//return(0);
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
// @bug 3005
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE &&
string(table->s->table_name.str).find("$vtable") != 0)
return HA_ERR_END_OF_FILE;
if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return HA_ERR_END_OF_FILE;
// @bug 2547
if (thd->infinidb_vtable.impossibleWhereOnUnion)
return HA_ERR_END_OF_FILE;
// @bug 2232. Basic SP support
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
{
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
if (ci->cal_conn_hndl)
{
// send ExeMgr a signal before cloing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// cancel query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0;
}
if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
cal_table_info ti;
ti = ci->tableMap[table];
int rc = HA_ERR_END_OF_FILE;
if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
{
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
return ER_INTERNAL_ERROR;
}
idbassert(ti.msTablePtr == table);
try
{
//rc = fetchNextRow(buf, ti, ci);
rc = 0;
}
catch (std::exception& e)
{
string emsg = string("Error while fetching from ExeMgr: ") + e.what();
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
return ER_INTERNAL_ERROR;
}
ci->tableMap[table] = ti;
if (rc != 0 && rc != HA_ERR_END_OF_FILE)
{
string emsg;
// remove this check when all error handling migrated to the new framework.
if (rc >= 1000)
emsg = ti.tpl_scan_ctx->errMsg;
else
{
logging::ErrorCodes errorcodes;
emsg = errorcodes.errorString(rc);
}
setError(thd, ER_INTERNAL_ERROR, emsg);
//setError(thd, ER_INTERNAL_ERROR, "testing");
ci->stats.fErrorNo = rc;
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
rc = ER_INTERNAL_ERROR;
}
return rc;
}
int ha_calpont_impl_group_by_end(Query* query, TABLE* table)
{
return 0;
}
// vim:sw=4 ts=4: