You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-267 Add basic engine support
This patch adds enough support so that cross engines joins with blob columns in the foreign engines will work. The modifications are as follows: * Add CrossEngine support for non-NULL-terminated (binary) data * Add row data support for blobs (similar to varbinary) * Add engine support for writing out blob data correctly to the storage engine API * Re-enable blob support in the engine plugin
This commit is contained in:
@@ -904,7 +904,8 @@ const CalpontSystemCatalog::TableAliasName make_aliasview(const std::string& s,
|
|||||||
*/
|
*/
|
||||||
inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type)
|
inline bool isCharType(const execplan::CalpontSystemCatalog::ColDataType type)
|
||||||
{
|
{
|
||||||
return (execplan::CalpontSystemCatalog::VARCHAR == type || execplan::CalpontSystemCatalog::CHAR == type);
|
return (execplan::CalpontSystemCatalog::VARCHAR == type ||
|
||||||
|
execplan::CalpontSystemCatalog::CHAR == type);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** convenience function to determine if column type is an
|
/** convenience function to determine if column type is an
|
||||||
|
@@ -231,18 +231,25 @@ void CrossEngineStep::makeMappings()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void CrossEngineStep::setField(int i, const char* value, Row& row)
|
void CrossEngineStep::setField(int i, const char* value, unsigned long length, Row& row)
|
||||||
{
|
{
|
||||||
CalpontSystemCatalog::ColDataType colType = row.getColType(i);
|
CalpontSystemCatalog::ColDataType colType = row.getColType(i);
|
||||||
|
|
||||||
if ((colType == CalpontSystemCatalog::CHAR || colType == CalpontSystemCatalog::VARCHAR) &&
|
if (((colType == CalpontSystemCatalog::CHAR || colType == CalpontSystemCatalog::VARCHAR) &&
|
||||||
row.getColumnWidth(i) > 8)
|
row.getColumnWidth(i) > 8))
|
||||||
{
|
{
|
||||||
if (value != NULL)
|
if (value != NULL)
|
||||||
row.setStringField(value, i);
|
row.setStringField(value, i);
|
||||||
else
|
else
|
||||||
row.setStringField("", i);
|
row.setStringField("", i);
|
||||||
}
|
}
|
||||||
|
else if ((colType == CalpontSystemCatalog::BLOB) || (colType == CalpontSystemCatalog::VARBINARY))
|
||||||
|
{
|
||||||
|
if (value != NULL)
|
||||||
|
row.setVarBinaryField((const uint8_t*)value, length, i);
|
||||||
|
else
|
||||||
|
row.setVarBinaryField(NULL, 0, i);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
CalpontSystemCatalog::ColType ct;
|
CalpontSystemCatalog::ColType ct;
|
||||||
@@ -484,7 +491,7 @@ void CrossEngineStep::execute()
|
|||||||
while ((rowIn = mysql->nextRow()) && !cancelled())
|
while ((rowIn = mysql->nextRow()) && !cancelled())
|
||||||
{
|
{
|
||||||
for(int i = 0; i < num_fields; i++)
|
for(int i = 0; i < num_fields; i++)
|
||||||
setField(i, rowIn[i], fRowDelivered);
|
setField(i, rowIn[i], mysql->getFieldLength(i), fRowDelivered);
|
||||||
|
|
||||||
addRow(rgDataDelivered);
|
addRow(rgDataDelivered);
|
||||||
}
|
}
|
||||||
@@ -504,7 +511,7 @@ void CrossEngineStep::execute()
|
|||||||
for(int i = 0; i < num_fields; i++)
|
for(int i = 0; i < num_fields; i++)
|
||||||
{
|
{
|
||||||
if (fFe1Column[i] != -1)
|
if (fFe1Column[i] != -1)
|
||||||
setField(fFe1Column[i], rowIn[i], rowFe1);
|
setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), rowFe1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false)
|
if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false)
|
||||||
@@ -518,7 +525,7 @@ void CrossEngineStep::execute()
|
|||||||
for(int i = 0; i < num_fields; i++)
|
for(int i = 0; i < num_fields; i++)
|
||||||
{
|
{
|
||||||
if (fFe1Column[i] == -1)
|
if (fFe1Column[i] == -1)
|
||||||
setField(i, rowIn[i], fRowDelivered);
|
setField(i, rowIn[i], mysql->getFieldLength(i), fRowDelivered);
|
||||||
}
|
}
|
||||||
|
|
||||||
addRow(rgDataDelivered);
|
addRow(rgDataDelivered);
|
||||||
@@ -536,7 +543,7 @@ void CrossEngineStep::execute()
|
|||||||
while ((rowIn = mysql->nextRow()) && !cancelled())
|
while ((rowIn = mysql->nextRow()) && !cancelled())
|
||||||
{
|
{
|
||||||
for(int i = 0; i < num_fields; i++)
|
for(int i = 0; i < num_fields; i++)
|
||||||
setField(i, rowIn[i], rowFe3);
|
setField(i, rowIn[i], mysql->getFieldLength(i), rowFe3);
|
||||||
|
|
||||||
fFeInstance->evaluate(rowFe3, fFeSelects);
|
fFeInstance->evaluate(rowFe3, fFeSelects);
|
||||||
fFeInstance->evaluate(rowFe3, fFeSelects);
|
fFeInstance->evaluate(rowFe3, fFeSelects);
|
||||||
@@ -567,7 +574,7 @@ void CrossEngineStep::execute()
|
|||||||
for(int i = 0; i < num_fields; i++)
|
for(int i = 0; i < num_fields; i++)
|
||||||
{
|
{
|
||||||
if (fFe1Column[i] != -1)
|
if (fFe1Column[i] != -1)
|
||||||
setField(fFe1Column[i], rowIn[i], rowFe1);
|
setField(fFe1Column[i], rowIn[i], mysql->getFieldLength(i), rowFe1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false)
|
if (fFeFilters && fFeInstance->evaluate(rowFe1, fFeFilters.get()) == false)
|
||||||
@@ -581,7 +588,7 @@ void CrossEngineStep::execute()
|
|||||||
for(int i = 0; i < num_fields; i++)
|
for(int i = 0; i < num_fields; i++)
|
||||||
{
|
{
|
||||||
if (fFe1Column[i] == -1)
|
if (fFe1Column[i] == -1)
|
||||||
setField(i, rowIn[i], rowFe3);
|
setField(i, rowIn[i], mysql->getFieldLength(i), rowFe3);
|
||||||
}
|
}
|
||||||
|
|
||||||
fFeInstance->evaluate(rowFe3, fFeSelects);
|
fFeInstance->evaluate(rowFe3, fFeSelects);
|
||||||
|
@@ -60,13 +60,20 @@ public:
|
|||||||
|
|
||||||
int getFieldCount() { return mysql_num_fields(fRes); }
|
int getFieldCount() { return mysql_num_fields(fRes); }
|
||||||
int getRowCount() { return mysql_num_rows(fRes); }
|
int getRowCount() { return mysql_num_rows(fRes); }
|
||||||
char** nextRow() { return mysql_fetch_row(fRes); }
|
char** nextRow()
|
||||||
|
{
|
||||||
|
char** row = mysql_fetch_row(fRes);
|
||||||
|
fieldLengths = mysql_fetch_lengths(fRes);
|
||||||
|
return row;
|
||||||
|
}
|
||||||
|
long getFieldLength(int field) { return fieldLengths[field]; }
|
||||||
const std::string& getError() { return fErrStr; }
|
const std::string& getError() { return fErrStr; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MYSQL* fCon;
|
MYSQL* fCon;
|
||||||
MYSQL_RES* fRes;
|
MYSQL_RES* fRes;
|
||||||
std::string fErrStr;
|
std::string fErrStr;
|
||||||
|
unsigned long *fieldLengths;
|
||||||
};
|
};
|
||||||
|
|
||||||
/** @brief class CrossEngineStep
|
/** @brief class CrossEngineStep
|
||||||
@@ -150,7 +157,7 @@ protected:
|
|||||||
virtual void makeMappings();
|
virtual void makeMappings();
|
||||||
virtual void addFilterStr(const std::vector<const execplan::Filter*>&, const std::string&);
|
virtual void addFilterStr(const std::vector<const execplan::Filter*>&, const std::string&);
|
||||||
virtual std::string makeQuery();
|
virtual std::string makeQuery();
|
||||||
virtual void setField(int, const char*, rowgroup::Row&);
|
virtual void setField(int, const char*, unsigned long, rowgroup::Row&);
|
||||||
inline void addRow(rowgroup::RGData &);
|
inline void addRow(rowgroup::RGData &);
|
||||||
//inline void addRow(boost::shared_array<uint8_t>&);
|
//inline void addRow(boost::shared_array<uint8_t>&);
|
||||||
virtual int64_t convertValueNum(
|
virtual int64_t convertValueNum(
|
||||||
|
@@ -2233,10 +2233,10 @@ CalpontSystemCatalog::ColType colType_MysqlToIDB (const Item* item)
|
|||||||
ct.colDataType = CalpontSystemCatalog::DATETIME;
|
ct.colDataType = CalpontSystemCatalog::DATETIME;
|
||||||
ct.colWidth = 8;
|
ct.colWidth = 8;
|
||||||
}
|
}
|
||||||
if (item->field_type() == MYSQL_TYPE_BLOB)
|
if (item->field_type() == MYSQL_TYPE_BLOB)
|
||||||
{
|
{
|
||||||
throw runtime_error ("BLOB/TEXT data types are not supported by ColumnStore.");
|
ct.colDataType = CalpontSystemCatalog::BLOB;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
/* FIXME:
|
/* FIXME:
|
||||||
|
@@ -467,6 +467,7 @@ int fetchNextRow(uchar *buf, cal_table_info& ti, cal_connection_info* ci)
|
|||||||
case CalpontSystemCatalog::CHAR:
|
case CalpontSystemCatalog::CHAR:
|
||||||
case CalpontSystemCatalog::VARCHAR:
|
case CalpontSystemCatalog::VARCHAR:
|
||||||
{
|
{
|
||||||
|
// TODO: use getStringPointer instead of getStringField to stop the string copies
|
||||||
Field_varstring* f2 = (Field_varstring*)*f;
|
Field_varstring* f2 = (Field_varstring*)*f;
|
||||||
switch (colType.colWidth)
|
switch (colType.colWidth)
|
||||||
{
|
{
|
||||||
@@ -622,6 +623,14 @@ int fetchNextRow(uchar *buf, cal_table_info& ti, cal_connection_info* ci)
|
|||||||
storeNumericField(f, intColVal, colType);
|
storeNumericField(f, intColVal, colType);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case CalpontSystemCatalog::BLOB:
|
||||||
|
{
|
||||||
|
Field_blob *f2 = (Field_blob*)*f;
|
||||||
|
f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
|
||||||
|
if ((*f)->null_ptr)
|
||||||
|
*(*f)->null_ptr &= ~(*f)->null_bit;
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: // treat as int64
|
default: // treat as int64
|
||||||
{
|
{
|
||||||
intColVal = row.getUintField<8>(s);
|
intColVal = row.getUintField<8>(s);
|
||||||
|
@@ -365,7 +365,8 @@ string Row::toString() const
|
|||||||
else
|
else
|
||||||
switch (types[i]) {
|
switch (types[i]) {
|
||||||
case CalpontSystemCatalog::CHAR:
|
case CalpontSystemCatalog::CHAR:
|
||||||
case CalpontSystemCatalog::VARCHAR: {
|
case CalpontSystemCatalog::VARCHAR:
|
||||||
|
{
|
||||||
const string &tmp = getStringField(i);
|
const string &tmp = getStringField(i);
|
||||||
os << "(" << getStringLength(i) << ") '" << tmp << "' ";
|
os << "(" << getStringLength(i) << ") '" << tmp << "' ";
|
||||||
break;
|
break;
|
||||||
@@ -381,7 +382,9 @@ string Row::toString() const
|
|||||||
case CalpontSystemCatalog::LONGDOUBLE:
|
case CalpontSystemCatalog::LONGDOUBLE:
|
||||||
os << getLongDoubleField(i) << " ";
|
os << getLongDoubleField(i) << " ";
|
||||||
break;
|
break;
|
||||||
case CalpontSystemCatalog::VARBINARY: {
|
case CalpontSystemCatalog::VARBINARY:
|
||||||
|
case CalpontSystemCatalog::BLOB:
|
||||||
|
{
|
||||||
uint32_t len = getVarBinaryLength(i);
|
uint32_t len = getVarBinaryLength(i);
|
||||||
const uint8_t* val = getVarBinaryField(i);
|
const uint8_t* val = getVarBinaryField(i);
|
||||||
os << "0x" << hex;
|
os << "0x" << hex;
|
||||||
@@ -429,7 +432,9 @@ string Row::toCSV() const
|
|||||||
case CalpontSystemCatalog::LONGDOUBLE:
|
case CalpontSystemCatalog::LONGDOUBLE:
|
||||||
os << getLongDoubleField(i);
|
os << getLongDoubleField(i);
|
||||||
break;
|
break;
|
||||||
case CalpontSystemCatalog::VARBINARY: {
|
case CalpontSystemCatalog::VARBINARY:
|
||||||
|
case CalpontSystemCatalog::BLOB:
|
||||||
|
{
|
||||||
uint32_t len = getVarBinaryLength(i);
|
uint32_t len = getVarBinaryLength(i);
|
||||||
const uint8_t* val = getVarBinaryField(i);
|
const uint8_t* val = getVarBinaryField(i);
|
||||||
os << "0x" << hex;
|
os << "0x" << hex;
|
||||||
@@ -532,6 +537,11 @@ void Row::initToNull()
|
|||||||
memset(&data[offsets[i]], 0xFF, getColumnWidth(i));
|
memset(&data[offsets[i]], 0xFF, getColumnWidth(i));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case CalpontSystemCatalog::BLOB: {
|
||||||
|
// TODO: no NULL value for long double yet, this is a nan.
|
||||||
|
memset(&data[offsets[i]], 0xFF, getColumnWidth(i));
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
ostringstream os;
|
ostringstream os;
|
||||||
os << "Row::initToNull(): got bad column type (" << types[i] <<
|
os << "Row::initToNull(): got bad column type (" << types[i] <<
|
||||||
@@ -603,6 +613,7 @@ bool Row::isNullValue(uint32_t colIndex) const
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case CalpontSystemCatalog::BLOB:
|
||||||
case CalpontSystemCatalog::VARBINARY: {
|
case CalpontSystemCatalog::VARBINARY: {
|
||||||
uint32_t pos = offsets[colIndex];
|
uint32_t pos = offsets[colIndex];
|
||||||
if (inStringTable(colIndex)) {
|
if (inStringTable(colIndex)) {
|
||||||
@@ -1100,7 +1111,7 @@ void applyMapping(const int *mapping, const Row &in, Row *out)
|
|||||||
//out->setStringField(in.getStringField(i), mapping[i]);
|
//out->setStringField(in.getStringField(i), mapping[i]);
|
||||||
else if (UNLIKELY(in.isShortString(i)))
|
else if (UNLIKELY(in.isShortString(i)))
|
||||||
out->setUintField(in.getUintField(i), mapping[i]);
|
out->setUintField(in.getUintField(i), mapping[i]);
|
||||||
else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY))
|
else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB))
|
||||||
out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]);
|
out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), mapping[i]);
|
||||||
else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE))
|
else if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::LONGDOUBLE))
|
||||||
out->setLongDoubleField(in.getLongDoubleField(i), mapping[i]);
|
out->setLongDoubleField(in.getLongDoubleField(i), mapping[i]);
|
||||||
|
@@ -1421,7 +1421,7 @@ inline void copyRow(const Row &in, Row *out, uint32_t colCount)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (uint32_t i = 0; i < colCount; i++) {
|
for (uint32_t i = 0; i < colCount; i++) {
|
||||||
if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY))
|
if (UNLIKELY(in.getColTypes()[i] == execplan::CalpontSystemCatalog::VARBINARY || in.getColTypes()[i] == execplan::CalpontSystemCatalog::BLOB))
|
||||||
out->setVarBinaryField(in.getVarBinaryStringField(i), i);
|
out->setVarBinaryField(in.getVarBinaryStringField(i), i);
|
||||||
else if (UNLIKELY(in.isLongString(i)))
|
else if (UNLIKELY(in.isLongString(i)))
|
||||||
//out->setStringField(in.getStringField(i), i);
|
//out->setStringField(in.getStringField(i), i);
|
||||||
|
Reference in New Issue
Block a user