You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-641 Basic support for updates.
This commit is contained in:
committed by
Roman Nozdrin
parent
f73de30427
commit
b07db9a8f4
@ -1502,6 +1502,17 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
|
||||
isFromCol = true;
|
||||
columnAssignmentPtr->fFromCol = true;
|
||||
Item_field* setIt = reinterpret_cast<Item_field*> (value);
|
||||
|
||||
// Minor optimization:
|
||||
// do not perform updates of the form "update t1 set a = a;"
|
||||
if (!strcmp(item->name.str, setIt->name.str)
|
||||
&& item->table_name && setIt->table_name && !strcmp(item->table_name, setIt->table_name)
|
||||
&& item->db_name && setIt->db_name && !strcmp(item->db_name, setIt->db_name))
|
||||
{
|
||||
delete columnAssignmentPtr;
|
||||
continue;
|
||||
}
|
||||
|
||||
string sectableName = string(setIt->table_name.str);
|
||||
|
||||
if ( setIt->db_name.str ) //derived table
|
||||
@ -1609,6 +1620,13 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
|
||||
ci->stats.fQueryType = updateCP->queryType();
|
||||
}
|
||||
|
||||
// Exit early if there is nothing to update
|
||||
if (colAssignmentListPtr->empty())
|
||||
{
|
||||
ci->affectedRows = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
//save table oid for commit/rollback to use
|
||||
uint32_t sessionID = tid2sid(thd->thread_id);
|
||||
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
@ -1645,7 +1663,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
|
||||
|
||||
TableName* qualifiedTablName = new TableName();
|
||||
|
||||
|
||||
UpdateSqlStatement updateStmt;
|
||||
//@Bug 2753. To make sure the momory is freed.
|
||||
updateStmt.fColAssignmentListPtr = colAssignmentListPtr;
|
||||
|
@ -1382,6 +1382,7 @@ DataConvert::convertColumnData(const CalpontSystemCatalog::ColType& colType,
|
||||
// Simplest form of a template will use colType and width as a parameter
|
||||
// There will be lots specializations
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
// TODO MCOL-641 implement decimal38 version of number_int_value
|
||||
if (colType.colWidth == 16)
|
||||
{
|
||||
int128_t bigint;
|
||||
|
@ -2631,6 +2631,7 @@ uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std
|
||||
//Rollbacked all versioned blocks
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
|
||||
std::string& err,
|
||||
ByteStream::quadbyte& PMId,
|
||||
@ -2804,54 +2805,6 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
|
||||
|
||||
for (unsigned int j = 0; j < columnsUpdated.size(); j++)
|
||||
{
|
||||
/* WriteEngine::ColTupleList colTupleList;
|
||||
//timer.start("lookupsyscat");
|
||||
tableColName.column = columnsUpdated[j]->get_Name();
|
||||
try
|
||||
{
|
||||
oid = systemCatalogPtr->lookupOID(tableColName);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
rc = 1;
|
||||
ostringstream oss;
|
||||
oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
|
||||
err = oss.str();
|
||||
}
|
||||
catch ( ... )
|
||||
{
|
||||
rc = 1;
|
||||
ostringstream oss;
|
||||
oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column;
|
||||
err = oss.str();
|
||||
}
|
||||
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
CalpontSystemCatalog::ColType colType;
|
||||
try
|
||||
{
|
||||
colType = systemCatalogPtr->colType(oid);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
rc = 1;
|
||||
ostringstream oss;
|
||||
oss << "colType got exception " << ex.what() << " with column oid " << oid;
|
||||
err = oss.str();
|
||||
}
|
||||
catch ( ... )
|
||||
{
|
||||
rc = 1;
|
||||
ostringstream oss;
|
||||
oss << "colType got unknown exception with column oid " << oid;
|
||||
err = oss.str();
|
||||
}
|
||||
|
||||
if (rc !=0)
|
||||
return rc;
|
||||
*/
|
||||
WriteEngine::ColTupleList colTupleList;
|
||||
CalpontSystemCatalog::ColType colType = colTypes[j];
|
||||
oid = oids[j];
|
||||
@ -2874,13 +2827,15 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
|
||||
rid = relativeRID;
|
||||
convertToRelativeRid (rid, extentNum, blockNum);
|
||||
rowIDLists.push_back(rid);
|
||||
uint32_t colWidth = (colTypes[j].colWidth > 8 ? 8 : colTypes[j].colWidth);
|
||||
uint32_t colWidth = ((colTypes[j].colWidth > 8 &&
|
||||
!(colTypes[j].colDataType == CalpontSystemCatalog::DECIMAL ||
|
||||
colTypes[j].colDataType == CalpontSystemCatalog::UDECIMAL)) ? 8 : colTypes[j].colWidth);
|
||||
int rrid = (int) relativeRID / (BYTE_PER_BLOCK / colWidth);
|
||||
// populate stats.blocksChanged
|
||||
if (rrid > preBlkNums[j])
|
||||
{
|
||||
preBlkNums[j] = rrid ;
|
||||
blocksChanged++;
|
||||
preBlkNums[j] = rrid ;
|
||||
blocksChanged++;
|
||||
}
|
||||
|
||||
}
|
||||
@ -3044,12 +2999,22 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
case CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
// WIP MCOL-641
|
||||
// decimal width > 8 cannot be stored in an integer
|
||||
if (fetchColColwidths[fetchColPos] > 8)
|
||||
{
|
||||
value = row.getStringField(fetchColPos);
|
||||
unsigned i = strlen(value.c_str());
|
||||
value = value.substr(0, i);
|
||||
int128_t* dec;
|
||||
char buf[41];
|
||||
dec = row.getBinaryField<int128_t>(fetchColPos);
|
||||
dataconvert::DataConvert::decimalToString<int128_t>(dec,
|
||||
(unsigned)fetchColScales[fetchColPos], buf,
|
||||
sizeof(buf), fetchColTypes[fetchColPos]);
|
||||
|
||||
value = buf;
|
||||
|
||||
//value = row.getStringField(fetchColPos);
|
||||
//unsigned i = strlen(value.c_str());
|
||||
//value = value.substr(0, i);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -3403,12 +3368,22 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
case CalpontSystemCatalog::UDECIMAL:
|
||||
{
|
||||
// WIP MCOL-641
|
||||
// decimal width > 8 cannot be stored in an integer
|
||||
if (fetchColColwidths[fetchColPos] > 8)
|
||||
{
|
||||
value = row.getStringField(fetchColPos);
|
||||
unsigned i = strlen(value.c_str());
|
||||
value = value.substr(0, i);
|
||||
int128_t* dec;
|
||||
char buf[41];
|
||||
dec = row.getBinaryField<int128_t>(fetchColPos);
|
||||
dataconvert::DataConvert::decimalToString<int128_t>(dec,
|
||||
(unsigned)fetchColScales[fetchColPos], buf,
|
||||
sizeof(buf), fetchColTypes[fetchColPos]);
|
||||
|
||||
value = buf;
|
||||
|
||||
//value = row.getStringField(fetchColPos);
|
||||
//unsigned i = strlen(value.c_str());
|
||||
//value = value.substr(0, i);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1733,9 +1733,6 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray,
|
||||
if (!bDelete) pVal = &((uint64_t*) valArray)[i];
|
||||
break;
|
||||
|
||||
// WIP
|
||||
//case WriteEngine::WR_INT128:
|
||||
|
||||
case WriteEngine::WR_BINARY:
|
||||
// WIP CSCCol type
|
||||
pVal = &((uint128_t*) valArray)[i];
|
||||
@ -1838,102 +1835,73 @@ int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridLis
|
||||
|
||||
// This is a awkward way to convert void* and get ith element, I just don't have a good solution for that
|
||||
// How about pVal = valArray? You're always getting the 0'th element here anyways.
|
||||
switch (curCol.colType)
|
||||
// TODO MCOL-641 add support here
|
||||
// This branch does not seem to be called from anywhere
|
||||
if (!bDelete)
|
||||
{
|
||||
// case WriteEngine::WR_LONG : pVal = &((long *) valArray)[i]; break;
|
||||
case WriteEngine::WR_FLOAT :
|
||||
if (!bDelete) pVal = &((float*) valArray)[0];
|
||||
switch (curCol.colType)
|
||||
{
|
||||
case WriteEngine::WR_FLOAT :
|
||||
pVal = &((float*) valArray)[0];
|
||||
break;
|
||||
|
||||
//pOldVal = &((float *) oldValArray)[i];
|
||||
break;
|
||||
case WriteEngine::WR_DOUBLE :
|
||||
pVal = &((double*) valArray)[0];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_DOUBLE :
|
||||
if (!bDelete) pVal = &((double*) valArray)[0];
|
||||
|
||||
//pOldVal = &((double *) oldValArray)[i];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_VARBINARY : // treat same as char for now
|
||||
case WriteEngine::WR_BLOB :
|
||||
case WriteEngine::WR_TEXT :
|
||||
case WriteEngine::WR_CHAR :
|
||||
if (!bDelete)
|
||||
{
|
||||
case WriteEngine::WR_VARBINARY : // treat same as char for now
|
||||
case WriteEngine::WR_BLOB :
|
||||
case WriteEngine::WR_TEXT :
|
||||
case WriteEngine::WR_CHAR :
|
||||
memcpy(charTmpBuf, (char*)valArray, 8);
|
||||
pVal = charTmpBuf;
|
||||
}
|
||||
break;
|
||||
|
||||
//pOldVal = (char*)oldValArray + i*8;
|
||||
break;
|
||||
// case WriteEngine::WR_BIT : pVal = &((bool *) valArray)[i]; break;
|
||||
case WriteEngine::WR_SHORT :
|
||||
pVal = &((short*) valArray)[0];
|
||||
break;
|
||||
|
||||
// case WriteEngine::WR_BIT : pVal = &((bool *) valArray)[i]; break;
|
||||
case WriteEngine::WR_SHORT :
|
||||
if (!bDelete) pVal = &((short*) valArray)[0];
|
||||
case WriteEngine::WR_BYTE :
|
||||
pVal = &((char*) valArray)[0];
|
||||
break;
|
||||
|
||||
//pOldVal = &((short *) oldValArray)[i];
|
||||
break;
|
||||
case WriteEngine::WR_LONGLONG:
|
||||
pVal = &((long long*) valArray)[0];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_BYTE :
|
||||
if (!bDelete) pVal = &((char*) valArray)[0];
|
||||
case WriteEngine::WR_TOKEN:
|
||||
pVal = &((Token*) valArray)[0];
|
||||
break;
|
||||
|
||||
//pOldVal = &((char *) oldValArray)[i];
|
||||
break;
|
||||
case WriteEngine::WR_INT :
|
||||
case WriteEngine::WR_MEDINT :
|
||||
pVal = &((int*) valArray)[0];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_LONGLONG:
|
||||
if (!bDelete) pVal = &((long long*) valArray)[0];
|
||||
case WriteEngine::WR_USHORT :
|
||||
pVal = &((uint16_t*) valArray)[0];
|
||||
break;
|
||||
|
||||
//pOldVal = &((long long *) oldValArray)[i];
|
||||
break;
|
||||
case WriteEngine::WR_UBYTE :
|
||||
pVal = &((uint8_t*) valArray)[0];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_TOKEN:
|
||||
if (!bDelete) pVal = &((Token*) valArray)[0];
|
||||
case WriteEngine::WR_ULONGLONG:
|
||||
pVal = &((uint64_t*) valArray)[0];
|
||||
break;
|
||||
|
||||
//pOldVal = &((Token *) oldValArray)[i];
|
||||
break;
|
||||
case WriteEngine::WR_UINT :
|
||||
case WriteEngine::WR_UMEDINT :
|
||||
pVal = &((uint32_t*) valArray)[0];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_INT :
|
||||
case WriteEngine::WR_MEDINT :
|
||||
if (!bDelete) pVal = &((int*) valArray)[0];
|
||||
|
||||
//pOldVal = &((int *) oldValArray)[i];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_USHORT :
|
||||
if (!bDelete) pVal = &((uint16_t*) valArray)[0];
|
||||
|
||||
//pOldVal = &((uint16_t *) oldValArray)[i];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_UBYTE :
|
||||
if (!bDelete) pVal = &((uint8_t*) valArray)[0];
|
||||
|
||||
//pOldVal = &((uint8_t *) oldValArray)[i];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_ULONGLONG:
|
||||
if (!bDelete) pVal = &((uint64_t*) valArray)[0];
|
||||
|
||||
//pOldVal = &((uint64_t *) oldValArray)[i];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_UINT :
|
||||
case WriteEngine::WR_UMEDINT :
|
||||
if (!bDelete) pVal = &((uint32_t*) valArray)[0];
|
||||
|
||||
//pOldVal = &((uint32_t *) oldValArray)[i];
|
||||
break;
|
||||
|
||||
default :
|
||||
if (!bDelete) pVal = &((int*) valArray)[0];
|
||||
|
||||
//pOldVal = &((int *) oldValArray)[i];
|
||||
break;
|
||||
default :
|
||||
pVal = &((int*) valArray)[0];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// This is the stuff to retrieve old value
|
||||
//memcpy(pOldVal, dataBuf + dataBio, curCol.colWidth);
|
||||
|
||||
if (bDelete)
|
||||
else
|
||||
{
|
||||
if (curCol.colType != WriteEngine::WR_BINARY)
|
||||
{
|
||||
@ -2084,6 +2052,10 @@ int ColumnOp::writeRowsValues(Column& curCol, uint64_t totalRow, const RIDList&
|
||||
pVal = &((uint32_t*) valArray)[i];
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_BINARY:
|
||||
pVal = &((int128_t*) valArray)[i];
|
||||
break;
|
||||
|
||||
default :
|
||||
pVal = &((int*) valArray)[i];
|
||||
break;
|
||||
|
@ -4376,9 +4376,6 @@ int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid,
|
||||
const int32_t tableOid)
|
||||
{
|
||||
//Mark extents invalid
|
||||
//int rc = 0;
|
||||
//if (colExtentsStruct[0].dataOid < 3000)
|
||||
//{
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
ColumnOp* colOp = NULL;
|
||||
@ -4414,15 +4411,13 @@ int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid,
|
||||
|
||||
if (lbids.size() > 0)
|
||||
{
|
||||
// cout << "BRMWrapper::getInstance()->markExtentsInvalid(lbids); " << lbids.size() << " lbids" << endl;
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
}
|
||||
|
||||
//}
|
||||
if ( m_opType != DELETE)
|
||||
if (m_opType != DELETE)
|
||||
m_opType = UPDATE;
|
||||
|
||||
rc = writeColumnRecords (txnid, cscColTypeList, colExtentsStruct, colValueList, ridLists, tableOid);
|
||||
rc = writeColumnRecords(txnid, cscColTypeList, colExtentsStruct, colValueList, ridLists, tableOid);
|
||||
m_opType = NOOP;
|
||||
return rc;
|
||||
}
|
||||
@ -4438,6 +4433,7 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid,
|
||||
void* valArray = NULL;
|
||||
Column curCol;
|
||||
ColStruct curColStruct;
|
||||
CalpontSystemCatalog::ColType curColType;
|
||||
ColTupleList curTupleList;
|
||||
ColStructList::size_type totalColumn;
|
||||
ColStructList::size_type i;
|
||||
@ -4452,6 +4448,7 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid,
|
||||
{
|
||||
valArray = NULL;
|
||||
curColStruct = colStructList[i];
|
||||
curColType = cscColTypeList[i];
|
||||
curTupleList = colValueList[i];
|
||||
ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];
|
||||
|
||||
@ -4568,11 +4565,10 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid,
|
||||
valArray = (Token*) calloc(sizeof(Token), totalRow);
|
||||
break;
|
||||
|
||||
// WIP
|
||||
// WIP MCOL-641
|
||||
case WriteEngine::WR_BINARY:
|
||||
//case WriteEngine::WR_INT128:
|
||||
// Use column width and remove all C-casts from above
|
||||
valArray = calloc(totalRow, 16);
|
||||
valArray = calloc(totalRow, curColType.colWidth);
|
||||
break;
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user