1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-265 Add support for TIMESTAMP data type

This commit is contained in:
Gagan Goel
2019-03-17 14:14:03 -04:00
parent 8a7ccd7d93
commit e89d1ac3cf
167 changed files with 4346 additions and 250 deletions

View File

@ -600,6 +600,19 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool h
break;
}
case CalpontSystemCatalog::TIMESTAMP:
{
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<8>(s);
DataConvert::timestampToString(intColVal, tmp, 255, current_thd->variables.time_zone->get_name()->ptr(), colType.precision);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
@ -1005,6 +1018,205 @@ void makeUpdateSemiJoin(const ParseTree* n, void* obj)
return;
}
vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, int sessionID)
{
vector<string> returnVal;
typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_;
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(execplan::CalpontSystemCatalog::FE);
CalpontSystemCatalog::TableName aTableName;
boost::algorithm::to_lower(schema);
boost::algorithm::to_lower(tableName);
// select columnname from calpontsys.syscolumn
// where schema = schema and tablename = tableName
// and datatype = 'timestamp'
// and defaultvalue = 'current_timestamp() ON UPDATE current_timestamp()'
CalpontSelectExecutionPlan csep;
CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
CalpontSelectExecutionPlan::ColumnMap colMap;
SessionManager sm;
BRM::TxnID txnID;
txnID = sm.getTxnID(sessionID);
if (!txnID.valid)
{
txnID.id = 0;
txnID.valid = true;
}
QueryContext verID;
verID = sm.verID();
csep.txnID(txnID.id);
csep.verID(verID);
csep.sessionID(sessionID);
string sysTable = "calpontsys.syscolumn.";
string firstCol = sysTable + "columnname";
SimpleColumn* c1 = new SimpleColumn(firstCol, sessionID);
string secondCol = sysTable + "schema";
SimpleColumn* c2 = new SimpleColumn(secondCol, sessionID);
string thirdCol = sysTable + "tablename";
SimpleColumn* c3 = new SimpleColumn(thirdCol, sessionID);
string fourthCol = sysTable + "datatype";
SimpleColumn* c4 = new SimpleColumn(fourthCol, sessionID);
string fifthCol = sysTable + "defaultvalue";
SimpleColumn* c5 = new SimpleColumn(fifthCol, sessionID);
SRCP srcp;
srcp.reset(c1);
colMap.insert(CMVT_(firstCol, srcp));
srcp.reset(c2);
colMap.insert(CMVT_(secondCol, srcp));
srcp.reset(c3);
colMap.insert(CMVT_(thirdCol, srcp));
srcp.reset(c4);
colMap.insert(CMVT_(fourthCol, srcp));
srcp.reset(c5);
colMap.insert(CMVT_(fifthCol, srcp));
csep.columnMapNonStatic(colMap);
srcp.reset(c1->clone());
returnedColumnList.push_back(srcp);
csep.returnedCols(returnedColumnList);
// Filters
const SOP opeq(new Operator("="));
SimpleFilter* f1 = new SimpleFilter (opeq,
c2->clone(),
new ConstantColumn(schema, ConstantColumn::LITERAL));
filterTokenList.push_back(f1);
filterTokenList.push_back(new Operator("and"));
SimpleFilter* f2 = new SimpleFilter (opeq,
c3->clone(),
new ConstantColumn(tableName, ConstantColumn::LITERAL));
filterTokenList.push_back(f2);
filterTokenList.push_back(new Operator("and"));
SimpleFilter* f3 = new SimpleFilter (opeq,
c4->clone(),
new ConstantColumn((uint64_t) execplan::CalpontSystemCatalog::TIMESTAMP, ConstantColumn::NUM));
filterTokenList.push_back(f3);
filterTokenList.push_back(new Operator("and"));
string defaultValue = "current_timestamp() ON UPDATE current_timestamp()";
SimpleFilter* f4 = new SimpleFilter (opeq,
c5->clone(),
new ConstantColumn(defaultValue, ConstantColumn::LITERAL));
filterTokenList.push_back(f4);
csep.filterTokenList(filterTokenList);
CalpontSelectExecutionPlan::TableList tablelist;
tablelist.push_back(make_aliastable("calpontsys", "syscolumn", ""));
csep.tableList(tablelist);
boost::shared_ptr<messageqcpp::MessageQueueClient> exemgrClient (new messageqcpp::MessageQueueClient("ExeMgr1"));
ByteStream msg, emsgBs;
rowgroup::RGData rgData;
ByteStream::quadbyte qb = 4;
msg << qb;
rowgroup::RowGroup* rowGroup = 0;
uint32_t rowCount;
exemgrClient->write(msg);
ByteStream msgPlan;
csep.serialize(msgPlan);
exemgrClient->write(msgPlan);
msg.restart();
msg = exemgrClient->read(); //error handling
emsgBs = exemgrClient->read();
ByteStream::quadbyte qb1;
if (emsgBs.length() == 0)
{
//exemgrClient->shutdown();
//delete exemgrClient;
//exemgrClient = 0;
throw runtime_error("Lost conection to ExeMgr.");
}
string emsgStr;
emsgBs >> emsgStr;
if (msg.length() == 4)
{
msg >> qb1;
if (qb1 != 0)
{
//exemgrClient->shutdown();
//delete exemgrClient;
//exemgrClient = 0;
throw runtime_error(emsgStr);
}
}
while (true)
{
msg.restart();
msg = exemgrClient->read();
if ( msg.length() == 0 )
{
//exemgrClient->shutdown();
//delete exemgrClient;
//exemgrClient = 0;
throw runtime_error("Lost conection to ExeMgr.");
}
else
{
if (!rowGroup)
{
//This is mete data
rowGroup = new rowgroup::RowGroup();
rowGroup->deserialize(msg);
qb = 100;
msg.restart();
msg << qb;
exemgrClient->write(msg);
continue;
}
rgData.deserialize(msg);
rowGroup->setData(&rgData);
if (rowGroup->getStatus() != 0)
{
//msg.advance(rowGroup->getDataSize());
msg >> emsgStr;
//exemgrClient->shutdown();
//delete exemgrClient;
//exemgrClient = 0;
throw runtime_error(emsgStr);
}
rowCount = rowGroup->getRowCount();
if (rowCount > 0)
{
rowgroup::Row row;
rowGroup->initRow(&row);
for (uint32_t i = 0; i < rowCount; i++)
{
rowGroup->getRow(i, &row);
// we are only fetching a single column
returnVal.push_back(row.getStringField(0));
}
}
else
{
break;
}
//exemgrClient->shutdown();
//delete exemgrClient;
//exemgrClient = 0;
}
}
return returnVal;
}
uint32_t doUpdateDelete(THD* thd)
{
if (get_fe_conn_info_ptr() == NULL)
@ -1123,6 +1335,7 @@ uint32_t doUpdateDelete(THD* thd)
updateCP->queryType(CalpontSelectExecutionPlan::UPDATE);
ci->stats.fQueryType = updateCP->queryType();
uint32_t cnt = 0;
tr1::unordered_set<string> timeStampColumnNames;
// for (; table_ptr; table_ptr= table_ptr->next_local)
// {
@ -1176,6 +1389,12 @@ uint32_t doUpdateDelete(THD* thd)
else
schemaName = string(item->db_name);
if (item->field_type() == MYSQL_TYPE_TIMESTAMP ||
item->field_type() == MYSQL_TYPE_TIMESTAMP2)
{
timeStampColumnNames.insert(string(item->name.str));
}
columnAssignmentPtr = new ColumnAssignment();
columnAssignmentPtr->fColumn = string(item->name.str);
columnAssignmentPtr->fOperator = "=";
@ -1364,6 +1583,30 @@ uint32_t doUpdateDelete(THD* thd)
// if (cnt < thd->lex->select_lex.item_list.elements)
// dmlStmt += ", ";
}
// Support for on update current_timestamp() for timestamp fields
// Query calpontsys.syscolumn to get all timestamp columns with
// ON UPDATE current_timestamp() property
vector<string> onUpdateTimeStampColumns = getOnUpdateTimestampColumns(schemaName, tableName, tid2sid(thd->thread_id));
for (size_t i = 0; i < onUpdateTimeStampColumns.size(); i++)
{
if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end())
{
columnAssignmentPtr = new ColumnAssignment();
columnAssignmentPtr->fColumn = string(onUpdateTimeStampColumns[i]);
columnAssignmentPtr->fOperator = "=";
columnAssignmentPtr->fFuncScale = 0;
columnAssignmentPtr->fFromCol = false;
struct timeval tv;
char buf[64];
gettimeofday(&tv, 0);
MySQLTime time;
gmtSecToMySQLTime(tv.tv_sec, time, thd->variables.time_zone->get_name()->ptr());
sprintf(buf, "%04d-%02d-%02d %02d:%02d:%02d.%06ld", time.year, time.month, time.day, time.hour, time.minute, time.second, tv.tv_usec);
columnAssignmentPtr->fScalarExpression = buf;
colAssignmentListPtr->push_back ( columnAssignmentPtr );
}
}
}
else
{
@ -1492,6 +1735,7 @@ uint32_t doUpdateDelete(THD* thd)
pDMLPackage->set_TableName(tableName);
pDMLPackage->set_SchemaName(schemaName);
pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
pDMLPackage->set_IsFromCol( true );
//cout << " setting isFromCol to " << isFromCol << endl;
@ -1691,6 +1935,7 @@ uint32_t doUpdateDelete(THD* thd)
CalpontSystemCatalog::TableColName tcn = csc->colName(colrids[minWidthColOffset].objnum);
SimpleColumn* sc = new SimpleColumn(tcn.schema, tcn.table, tcn.column, csc->sessionID());
sc->tableAlias(aliasName);
sc->timeZone(thd->variables.time_zone->get_name()->ptr());
sc->resultType(csc->colType(colrids[minWidthColOffset].objnum));
SRCP srcp;
srcp.reset(sc);
@ -1820,6 +2065,7 @@ uint32_t doUpdateDelete(THD* thd)
//cout << "doUpdateDelete start new DMLProc client for ctrl-c " << " for session " << sessionID << endl;
VendorDMLStatement cmdStmt( "CTRL+C", DML_COMMAND, sessionID);
CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
ByteStream bytestream;
bytestream << static_cast<uint32_t>(sessionID);
pDMLPackage->write(bytestream);
@ -1941,6 +2187,7 @@ uint32_t doUpdateDelete(THD* thd)
{
VendorDMLStatement cmdStmt(command, DML_COMMAND, sessionID);
CalpontDMLPackage* pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt);
pDMLPackage->set_TimeZone(thd->variables.time_zone->get_name()->ptr());
pDMLPackage->setTableOid (ci->tableOid);
ByteStream bytestream;
bytestream << static_cast<uint32_t>(sessionID);
@ -3293,7 +3540,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
#ifdef _MSC_VER
aCmdLine = aCmdLine + "/bin/cpimport.exe -N -P " + to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
#else
aCmdLine = aCmdLine + "/bin/cpimport -m 1 -N -P " + boost::to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
aCmdLine = aCmdLine + "/bin/cpimport -m 1 -N -P " + boost::to_string(localModuleId) + " -s " + ci->delimiter + " -e 0" + " -T " + thd->variables.time_zone->get_name()->ptr() + " -E " + escapechar + ci->enclosed_by + " ";
#endif
}
}
@ -3302,7 +3549,7 @@ void ha_calpont_impl_start_bulk_insert(ha_rows rows, TABLE* table)
#ifdef _MSC_VER
aCmdLine = aCmdLine + "/bin/cpimport.exe -N -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
#else
aCmdLine = aCmdLine + "/bin/cpimport -m 1 -N -s " + ci->delimiter + " -e 0" + " -E " + escapechar + ci->enclosed_by + " ";
aCmdLine = aCmdLine + "/bin/cpimport -m 1 -N -s " + ci->delimiter + " -e 0" + " -T " + thd->variables.time_zone->get_name()->ptr() + " -E " + escapechar + ci->enclosed_by + " ";
#endif
}