You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-1052 Generate and send CSEP to and receive set data from ExeMgr.
This commit is contained in:
@ -276,6 +276,104 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType&
|
||||
}
|
||||
}
|
||||
|
||||
void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
|
||||
{
|
||||
// unset null bit first
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
// For unsigned, use the ColType returned in the row rather than the
|
||||
// unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
|
||||
// Hopefully, in all other cases we get it right.
|
||||
switch ((*f)->type())
|
||||
{
|
||||
case MYSQL_TYPE_NEWDECIMAL:
|
||||
{
|
||||
Field_new_decimal* f2 = (Field_new_decimal*)*f;
|
||||
|
||||
// @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
|
||||
// to create vtable limitation.
|
||||
if (f2->dec < ct.scale)
|
||||
f2->dec = ct.scale;
|
||||
|
||||
char buf[256];
|
||||
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
|
||||
f2->store(buf, strlen(buf), f2->charset());
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_TINY: //TINYINT type
|
||||
{
|
||||
Field_tiny* f2 = (Field_tiny*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_SHORT: //SMALLINT type
|
||||
{
|
||||
Field_short* f2 = (Field_short*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_LONG: //INT type
|
||||
{
|
||||
Field_long* f2 = (Field_long*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_LONGLONG: //BIGINT type
|
||||
{
|
||||
Field_longlong* f2 = (Field_longlong*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_FLOAT: // FLOAT type
|
||||
{
|
||||
Field_float* f2 = (Field_float*)*f;
|
||||
float float_val = *(float*)(&value);
|
||||
f2->store(float_val);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_DOUBLE: // DOUBLE type
|
||||
{
|
||||
Field_double* f2 = (Field_double*)*f;
|
||||
double double_val = *(double*)(&value);
|
||||
f2->store(double_val);
|
||||
break;
|
||||
}
|
||||
|
||||
case MYSQL_TYPE_VARCHAR:
|
||||
{
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
char tmp[25];
|
||||
|
||||
if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
|
||||
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
|
||||
else
|
||||
snprintf(tmp, 25, "%ld", value);
|
||||
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
Field_longlong* f2 = (Field_longlong*)*f;
|
||||
longlong int_val = (longlong)value;
|
||||
f2->store(int_val, f2->unsigned_flag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// @bug 2244. Log exception related to lost connection to ExeMgr.
|
||||
// Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
|
||||
@ -761,7 +859,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
|
||||
return rc;
|
||||
}
|
||||
|
||||
int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci)
|
||||
int fetchNextRowGrHandGroupBy(cal_table_info& ti, cal_connection_info* ci)
|
||||
{
|
||||
int rc = HA_ERR_END_OF_FILE;
|
||||
int num_attr = ti.msTablePtr->s->fields;
|
||||
@ -1193,6 +1291,442 @@ int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci)
|
||||
return rc;
|
||||
}
|
||||
|
||||
int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci)
|
||||
{
|
||||
int rc = HA_ERR_END_OF_FILE;
|
||||
int num_attr = ti.msTablePtr->s->fields;
|
||||
sm::status_t sm_stat;
|
||||
|
||||
try
|
||||
{
|
||||
if (ti.conn_hndl)
|
||||
{
|
||||
sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl);
|
||||
}
|
||||
else if (ci->cal_conn_hndl)
|
||||
{
|
||||
sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(¤t_thd->killed));
|
||||
}
|
||||
else
|
||||
throw runtime_error("internal error");
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
|
||||
// losing socket connection with ExeMgr
|
||||
//#ifdef INFINIDB_DEBUG
|
||||
tpl_scan_fetch_LogException( ti, ci, &ex);
|
||||
//#endif
|
||||
sm_stat = sm::CALPONT_INTERNAL_ERROR;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
|
||||
// losing socket connection with ExeMgr
|
||||
//#ifdef INFINIDB_DEBUG
|
||||
tpl_scan_fetch_LogException( ti, ci, 0 );
|
||||
//#endif
|
||||
sm_stat = sm::CALPONT_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (sm_stat == sm::STATUS_OK)
|
||||
{
|
||||
Field** f;
|
||||
f = ti.msTablePtr->field;
|
||||
//set all fields to null in null col bitmap
|
||||
//memset(buf, -1, ti.msTablePtr->s->null_bytes);
|
||||
std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
|
||||
int64_t intColVal = 0;
|
||||
uint64_t uintColVal = 0;
|
||||
char tmp[256];
|
||||
|
||||
RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup;
|
||||
|
||||
// table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup
|
||||
// set coltype.position to be the position in rowgroup. only set once.
|
||||
if (ti.tpl_scan_ctx->rowsreturned == 0 &&
|
||||
(ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF))
|
||||
{
|
||||
for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++)
|
||||
{
|
||||
int oid = rowGroup->getOIDs()[i];
|
||||
int j = 0;
|
||||
|
||||
for (; j < num_attr; j++)
|
||||
{
|
||||
// mysql should haved eliminated duplicate projection columns
|
||||
if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID)
|
||||
{
|
||||
colTypes[j].colPosition = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rowgroup::Row row;
|
||||
rowGroup->initRow(&row);
|
||||
rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row);
|
||||
int s;
|
||||
|
||||
for (int p = 0; p < num_attr; p++, f++)
|
||||
{
|
||||
//This col is going to be written
|
||||
bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index);
|
||||
|
||||
// get coltype if not there yet
|
||||
if (colTypes[0].colWidth == 0)
|
||||
{
|
||||
for (short c = 0; c < num_attr; c++)
|
||||
{
|
||||
colTypes[c].colPosition = c;
|
||||
colTypes[c].colWidth = rowGroup->getColumnWidth(c);
|
||||
colTypes[c].colDataType = rowGroup->getColTypes()[c];
|
||||
colTypes[c].columnOID = rowGroup->getOIDs()[c];
|
||||
colTypes[c].scale = rowGroup->getScale()[c];
|
||||
colTypes[c].precision = rowGroup->getPrecision()[c];
|
||||
}
|
||||
}
|
||||
|
||||
CalpontSystemCatalog::ColType colType(colTypes[p]);
|
||||
|
||||
// table mode handling
|
||||
if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)
|
||||
{
|
||||
if (colType.colPosition == -1) // not projected by tuplejoblist
|
||||
continue;
|
||||
else
|
||||
s = colType.colPosition;
|
||||
}
|
||||
else
|
||||
{
|
||||
s = p;
|
||||
}
|
||||
|
||||
// precision == -16 is borrowed as skip null check indicator for bit ops.
|
||||
if (row.isNullValue(s) && colType.precision != -16)
|
||||
{
|
||||
// @2835. Handle empty string and null confusion. store empty string for string column
|
||||
if (colType.colDataType == CalpontSystemCatalog::CHAR ||
|
||||
colType.colDataType == CalpontSystemCatalog::VARCHAR ||
|
||||
colType.colDataType == CalpontSystemCatalog::VARBINARY)
|
||||
{
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
f2->store(tmp, 0, f2->charset());
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// fetch and store data
|
||||
switch (colType.colDataType)
|
||||
{
|
||||
case CalpontSystemCatalog::DATE:
|
||||
{
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
intColVal = row.getUintField<4>(s);
|
||||
DataConvert::dateToString(intColVal, tmp, 255);
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
f2->set_notnull();
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::DATETIME:
|
||||
{
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
intColVal = row.getUintField<8>(s);
|
||||
DataConvert::datetimeToString(intColVal, tmp, 255);
|
||||
|
||||
/* setting the field_length is a sort-of hack. The length
|
||||
* at this point can be long enough to include mseconds.
|
||||
* ColumnStore doesn't fully support mseconds yet so if
|
||||
* they are requested, trim them off.
|
||||
* At a later date we should set this more intelligently
|
||||
* based on the result set.
|
||||
*/
|
||||
/* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */
|
||||
if (((*f)->field_length > 19) && ((*f)->field_length != 57))
|
||||
(*f)->field_length = strlen(tmp);
|
||||
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
f2->set_notnull();
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
{
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
|
||||
switch (colType.colWidth)
|
||||
{
|
||||
case 1:
|
||||
intColVal = row.getUintField<1>(s);
|
||||
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
|
||||
break;
|
||||
|
||||
case 2:
|
||||
intColVal = row.getUintField<2>(s);
|
||||
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
|
||||
break;
|
||||
|
||||
case 4:
|
||||
intColVal = row.getUintField<4>(s);
|
||||
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
|
||||
break;
|
||||
|
||||
case 8:
|
||||
//make sure we don't send strlen off into the weeds...
|
||||
intColVal = row.getUintField<8>(s);
|
||||
memcpy(tmp, &intColVal, 8);
|
||||
tmp[8] = 0;
|
||||
f2->store(tmp, strlen(tmp), f2->charset());
|
||||
break;
|
||||
|
||||
default:
|
||||
f2->store((const char*)row.getStringPointer(s), row.getStringLength(s), f2->charset());
|
||||
}
|
||||
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
f2->set_notnull();
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::VARBINARY:
|
||||
{
|
||||
Field_varstring* f2 = (Field_varstring*)*f;
|
||||
|
||||
if (current_thd->variables.infinidb_varbin_always_hex)
|
||||
{
|
||||
uint32_t l;
|
||||
const uint8_t* p = row.getVarBinaryField(l, s);
|
||||
uint32_t ll = l * 2;
|
||||
boost::scoped_array<char> sca(new char[ll]);
|
||||
vbin2hex(p, l, sca.get());
|
||||
f2->store(sca.get(), ll, f2->charset());
|
||||
}
|
||||
else
|
||||
f2->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), f2->charset());
|
||||
f2->set_notnull();
|
||||
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::BIGINT:
|
||||
{
|
||||
intColVal = row.getIntField<8>(s);
|
||||
storeNumericFieldGroupBy(f, intColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::UBIGINT:
|
||||
{
|
||||
uintColVal = row.getUintField<8>(s);
|
||||
storeNumericFieldGroupBy(f, uintColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::INT:
|
||||
{
|
||||
intColVal = row.getIntField<4>(s);
|
||||
storeNumericFieldGroupBy(f, intColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::UINT:
|
||||
{
|
||||
uintColVal = row.getUintField<4>(s);
|
||||
storeNumericFieldGroupBy(f, uintColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::SMALLINT:
|
||||
{
|
||||
intColVal = row.getIntField<2>(s);
|
||||
storeNumericFieldGroupBy(f, intColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::USMALLINT:
|
||||
{
|
||||
uintColVal = row.getUintField<2>(s);
|
||||
storeNumericFieldGroupBy(f, uintColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::TINYINT:
|
||||
{
|
||||
intColVal = row.getIntField<1>(s);
|
||||
storeNumericFieldGroupBy(f, intColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::UTINYINT:
|
||||
{
|
||||
uintColVal = row.getUintField<1>(s);
|
||||
storeNumericFieldGroupBy(f, uintColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
//In this case, we're trying to load a double output column with float data. This is the
|
||||
// case when you do sum(floatcol), e.g.
|
||||
case CalpontSystemCatalog::FLOAT:
|
||||
case CalpontSystemCatalog::UFLOAT:
|
||||
{
|
||||
float dl = row.getFloatField(s);
|
||||
|
||||
if (dl == std::numeric_limits<float>::infinity())
|
||||
continue;
|
||||
|
||||
//int64_t* icvp = (int64_t*)&dl;
|
||||
//intColVal = *icvp;
|
||||
Field_float* f2 = (Field_float*)*f;
|
||||
// bug 3485, reserve enough space for the longest float value
|
||||
// -3.402823466E+38 to -1.175494351E-38, 0, and
|
||||
// 1.175494351E-38 to 3.402823466E+38.
|
||||
(*f)->field_length = 40;
|
||||
|
||||
//float float_val = *(float*)(&value);
|
||||
//f2->store(float_val);
|
||||
if (f2->decimals() < (uint32_t)row.getScale(s))
|
||||
f2->dec = (uint32_t)row.getScale(s);
|
||||
|
||||
f2->store(dl);
|
||||
f2->set_notnull();
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
break;
|
||||
|
||||
//storeNumericField(f, intColVal, colType);
|
||||
//break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::DOUBLE:
|
||||
case CalpontSystemCatalog::UDOUBLE:
|
||||
{
|
||||
double dl = row.getDoubleField(s);
|
||||
|
||||
if (dl == std::numeric_limits<double>::infinity())
|
||||
continue;
|
||||
|
||||
Field_double* f2 = (Field_double*)*f;
|
||||
// bug 3483, reserve enough space for the longest double value
|
||||
// -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
|
||||
// 2.2250738585072014E-308 to 1.7976931348623157E+308.
|
||||
(*f)->field_length = 310;
|
||||
|
||||
//double double_val = *(double*)(&value);
|
||||
//f2->store(double_val);
|
||||
if (f2->decimals() < (uint32_t)row.getScale(s))
|
||||
f2->dec = (uint32_t)row.getScale(s);
|
||||
|
||||
f2->store(dl);
|
||||
f2->set_notnull();
|
||||
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
break;
|
||||
|
||||
|
||||
//int64_t* icvp = (int64_t*)&dl;
|
||||
//intColVal = *icvp;
|
||||
//storeNumericField(f, intColVal, colType);
|
||||
//break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
case CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
intColVal = row.getIntField(s);
|
||||
storeNumericFieldGroupBy(f, intColVal, colType);
|
||||
break;
|
||||
}
|
||||
|
||||
case CalpontSystemCatalog::BLOB:
|
||||
case CalpontSystemCatalog::TEXT:
|
||||
{
|
||||
Field_blob* f2 = (Field_blob*)*f;
|
||||
f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
|
||||
|
||||
if ((*f)->null_ptr)
|
||||
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default: // treat as int64
|
||||
{
|
||||
intColVal = row.getUintField<8>(s);
|
||||
storeNumericFieldGroupBy(f, intColVal, colType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ti.tpl_scan_ctx->rowsreturned++;
|
||||
ti.c++;
|
||||
#ifdef INFINIDB_DEBUG
|
||||
|
||||
if ((ti.c % 1000000) == 0)
|
||||
cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl;
|
||||
|
||||
#endif
|
||||
ti.moreRows = true;
|
||||
rc = 0;
|
||||
}
|
||||
else if (sm_stat == sm::SQL_NOT_FOUND)
|
||||
{
|
||||
IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl );
|
||||
ti.c = 0;
|
||||
ti.moreRows = false;
|
||||
rc = HA_ERR_END_OF_FILE;
|
||||
}
|
||||
else if (sm_stat == sm::CALPONT_INTERNAL_ERROR)
|
||||
{
|
||||
ti.moreRows = false;
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
ci->rc = rc;
|
||||
}
|
||||
else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR)
|
||||
{
|
||||
ti.moreRows = false;
|
||||
rc = logging::ERR_LOST_CONN_EXEMGR;
|
||||
sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl,
|
||||
current_thd->variables.infinidb_local_query);
|
||||
idbassert(ci->cal_conn_hndl != 0);
|
||||
ci->rc = rc;
|
||||
}
|
||||
else if (sm_stat == sm::SQL_KILLED)
|
||||
{
|
||||
// query was aborted by the user. treat it the same as limit query. close
|
||||
// connection after rnd_close.
|
||||
ti.c = 0;
|
||||
ti.moreRows = false;
|
||||
rc = HA_ERR_END_OF_FILE;
|
||||
ci->rc = rc;
|
||||
}
|
||||
else
|
||||
{
|
||||
ti.moreRows = false;
|
||||
rc = sm_stat;
|
||||
ci->rc = rc;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
void makeUpdateScalarJoin(const ParseTree* n, void* obj)
|
||||
{
|
||||
TreeNode* tn = n->data();
|
||||
@ -5405,7 +5939,7 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos)
|
||||
***********************************************************/
|
||||
int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table)
|
||||
{
|
||||
//first_row= true;
|
||||
//first_row= true;
|
||||
string tableName = group_hand->table_list->table->s->table_name.str;
|
||||
IDEBUG( cout << "group_by_init for table " <<
|
||||
group_hand->table_list->table->s->table_name.str << endl );
|
||||
@ -5548,6 +6082,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
|
||||
bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false);
|
||||
|
||||
/*
|
||||
// table mode
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
|
||||
{
|
||||
@ -5626,8 +6161,9 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
if (ci->tableMap.size() == 1)
|
||||
ti.csep->data(idb_mysql_query_str(thd));
|
||||
}
|
||||
*/
|
||||
// vtable mode
|
||||
else
|
||||
//else
|
||||
{
|
||||
//if (!ci->cal_conn_hndl || thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE)
|
||||
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE)
|
||||
@ -5702,6 +6238,23 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
csep->verID(verID);
|
||||
csep->sessionID(sessionID);
|
||||
|
||||
if (group_hand->table_list->db_length)
|
||||
csep->schemaName(group_hand->table_list->db);
|
||||
|
||||
csep->traceFlags(ci->traceFlags);
|
||||
//ti.msTablePtr = group_hand->table_list->table;
|
||||
|
||||
gi.groupByTables = group_hand->table_list;
|
||||
gi.groupByFields = group_hand->select;
|
||||
gi.groupByWhere = group_hand->where;
|
||||
gi.groupByGroup = group_hand->group_by;
|
||||
gi.groupByOrder = group_hand->order_by;
|
||||
gi.groupByHaving = group_hand->having;
|
||||
gi.groupByDistinct = group_hand->distinct;
|
||||
|
||||
// send plan whenever group_init is called
|
||||
int status = cp_get_group_plan(thd, csep, gi);
|
||||
/*
|
||||
if (thd->db)
|
||||
csep->schemaName(thd->db);
|
||||
|
||||
@ -5712,7 +6265,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
|
||||
//get plan
|
||||
int status = cp_get_plan(thd, csep);
|
||||
|
||||
*/
|
||||
//if (cp_get_plan(thd, csep) != 0)
|
||||
if (status > 0)
|
||||
goto internal_error;
|
||||
@ -5887,6 +6440,9 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
ti = ci->tableMap[table];
|
||||
ti.msTablePtr = table;
|
||||
|
||||
// MCOL-1052
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;
|
||||
|
||||
if ((thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE) ||
|
||||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ||
|
||||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY))
|
||||
@ -6037,9 +6593,8 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
return ER_INTERNAL_ERROR;
|
||||
|
||||
// @bug 3005
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE &&
|
||||
string(table->s->table_name.str).find("$vtable") != 0)
|
||||
return HA_ERR_END_OF_FILE;
|
||||
//if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE)
|
||||
// return HA_ERR_END_OF_FILE;
|
||||
|
||||
if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
|
||||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
|
||||
@ -6051,13 +6606,13 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
|
||||
// @bug 2232. Basic SP support
|
||||
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
|
||||
if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
|
||||
/*if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
|
||||
{
|
||||
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
|
||||
return ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
*/
|
||||
if (!thd->infinidb_vtable.cal_conn_info)
|
||||
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
|
||||
|
||||
@ -6105,8 +6660,9 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
|
||||
try
|
||||
{
|
||||
rc = fetchNextRowGrHand(ti, ci);
|
||||
rc = 0;
|
||||
rc = fetchNextRowGrHandGroupBy(ti, ci);
|
||||
// MCOL-1052
|
||||
//rc = 0;
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
@ -6143,7 +6699,189 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
|
||||
|
||||
int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table)
|
||||
{
|
||||
return 0;
|
||||
int rc = 0;
|
||||
THD* thd = current_thd;
|
||||
cal_connection_info* ci = NULL;
|
||||
|
||||
|
||||
if (thd->slave_thread && (
|
||||
thd->lex->sql_command == SQLCOM_INSERT ||
|
||||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
|
||||
thd->lex->sql_command == SQLCOM_UPDATE ||
|
||||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
|
||||
thd->lex->sql_command == SQLCOM_DELETE ||
|
||||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
|
||||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
|
||||
thd->lex->sql_command == SQLCOM_LOAD))
|
||||
return 0;
|
||||
|
||||
thd->infinidb_vtable.isNewQuery = true;
|
||||
|
||||
if (thd->infinidb_vtable.cal_conn_info)
|
||||
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
|
||||
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
|
||||
{
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;// flip back to normal state
|
||||
return rc;
|
||||
}
|
||||
|
||||
// if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1)
|
||||
// return rc;
|
||||
/*
|
||||
if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
|
||||
return rc;
|
||||
|
||||
if (((thd->lex)->sql_command == SQLCOM_UPDATE) ||
|
||||
((thd->lex)->sql_command == SQLCOM_DELETE) ||
|
||||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) ||
|
||||
((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
|
||||
return rc;
|
||||
*/
|
||||
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
|
||||
{
|
||||
// @bug 4022. error handling for select part of dml
|
||||
if (ci->cal_conn_hndl && ci->rc)
|
||||
{
|
||||
// send ExeMgr a signal before closing the connection
|
||||
ByteStream msg;
|
||||
ByteStream::quadbyte qb = 0;
|
||||
msg << qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->exeMgr->write(msg);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// this is error handling, so ignore connection failure.
|
||||
}
|
||||
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ci)
|
||||
{
|
||||
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
|
||||
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
|
||||
}
|
||||
|
||||
// @bug 3078. Also session limit variable works the same as ctrl+c
|
||||
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD ||
|
||||
((thd->lex)->sql_command != SQLCOM_INSERT &&
|
||||
(thd->lex)->sql_command != SQLCOM_INSERT_SELECT &&
|
||||
thd->variables.select_limit != (uint64_t) - 1))
|
||||
{
|
||||
if (ci->cal_conn_hndl)
|
||||
{
|
||||
// send ExeMgr a signal before closing the connection
|
||||
ByteStream msg;
|
||||
ByteStream::quadbyte qb = 0;
|
||||
msg << qb;
|
||||
|
||||
try
|
||||
{
|
||||
ci->cal_conn_hndl->exeMgr->write(msg);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// this is the end of query. Ignore the exception if exemgr connection failed
|
||||
// for whatever reason.
|
||||
}
|
||||
|
||||
sm::sm_cleanup(ci->cal_conn_hndl);
|
||||
ci->cal_conn_hndl = 0;
|
||||
// clear querystats because no query stats available for cancelled query
|
||||
ci->queryStats = "";
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
IDEBUG( cerr << "group_by_end for table " << table->s->table_name.str << endl );
|
||||
|
||||
cal_table_info ti = ci->tableMap[table];
|
||||
sm::cpsm_conhdl_t* hndl;
|
||||
|
||||
//if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
|
||||
// hndl = ti.conn_hndl;
|
||||
//else
|
||||
hndl = ci->cal_conn_hndl;
|
||||
|
||||
if (ti.tpl_ctx)
|
||||
{
|
||||
if (ti.tpl_scan_ctx.get())
|
||||
{
|
||||
try
|
||||
{
|
||||
sm::tpl_scan_close(ti.tpl_scan_ctx);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ti.tpl_scan_ctx.reset();
|
||||
|
||||
try
|
||||
{
|
||||
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats);
|
||||
|
||||
// set conn hndl back. could be changed in tpl_close
|
||||
//if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
|
||||
// ti.conn_hndl = hndl;
|
||||
//else
|
||||
ci->cal_conn_hndl = hndl;
|
||||
|
||||
ti.tpl_ctx = 0;
|
||||
}
|
||||
catch (IDBExcept& e)
|
||||
{
|
||||
if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
|
||||
{
|
||||
string msg = string("Columnstore Query Stats - ") + e.what();
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, e.what());
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, e.what());
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in rnd_end");
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
ti.tpl_ctx = 0;
|
||||
|
||||
/*
|
||||
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE &&
|
||||
thd->infinidb_vtable.has_order_by)
|
||||
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ORDER_BY;
|
||||
*/
|
||||
ci->tableMap[table] = ti;
|
||||
|
||||
// push warnings from CREATE phase
|
||||
if (!ci->warningMsg.empty())
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
|
||||
|
||||
ci->warningMsg.clear();
|
||||
// reset expressionId just in case
|
||||
ci->expressionId = 0;
|
||||
return rc;
|
||||
}
|
||||
|
||||
// vim:sw=4 ts=4:
|
||||
|
Reference in New Issue
Block a user