1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-641 Basic support for multi-value inserts, and deletes.

This commit is contained in:
Gagan Goel
2020-02-17 19:52:05 -05:00
committed by Roman Nozdrin
parent 55afcd8890
commit 93170c3b31
11 changed files with 266 additions and 1551 deletions

View File

@ -116,7 +116,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
RowList rows = tablePtr->get_RowList();
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypes;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::ColValueList colValuesList;
@ -141,7 +141,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
Row* rowPtr = rows.at(0);
ColumnList columns = rowPtr->get_ColumnList();
unsigned int numcols = rowPtr->get_NumberOfColumns();
cscColTypes.reserve(numcols);
cscColTypeList.reserve(numcols);
// WIP
// We presume that DictCols number is low
colStructs.reserve(numcols);
@ -159,7 +159,6 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
cscColTypes.push_back(colType);
WriteEngine::ColStruct colStruct;
colStruct.fColDbRoot = dbroot;
WriteEngine::DctnryStruct dctnryStruct;
@ -169,7 +168,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
colStruct.fCompressionType = colType.compressionType;
// Token
if ( isDictCol(colType) )
if (isDictCol(colType) )
{
// WIP Hardcoded value
colStruct.colWidth = 8;
@ -199,6 +198,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
++column_iterator;
}
@ -538,7 +538,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
if (colValuesList[0].size() > 0)
{
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypes, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum)))
(error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
@ -842,6 +842,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
bool isInsertSelect = insertPkg.get_isInsertSelect();
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::ColValueList colValuesList;
@ -1046,7 +1047,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
colStruct.fCompressionType = colType.compressionType;
// Token
if ( isDictCol(colType) )
if (isDictCol(colType) )
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
@ -1075,6 +1076,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
++column_iterator;
}
@ -1324,7 +1326,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
*/
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList,
(error = fWEWrapper.insertColumnRecs(txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList, dicStringList,
dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
{
if (error == ERR_BRM_DEAD_LOCK)
@ -2691,6 +2693,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
WriteEngine::ColStruct colStruct;
WriteEngine::ColValueList colValueList;
WriteEngine::RIDList rowIDLists;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryStruct dctnryStruct;
@ -2858,7 +2861,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
colStruct.fCompressionType = colType.compressionType;
tableColName.column = columnsUpdated[j]->get_Name();
if ( !ridsFetched)
if (!ridsFetched)
{
// querystats
uint64_t relativeRID = 0;
@ -3806,12 +3809,13 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
colStructList.push_back(colStruct);
colValueList.push_back (colTupleList);
cscColTypeList.push_back(colType);
} //end of bulding values and column structure.
//timer.stop("fetch values");
if (rowIDLists.size() > 0)
{
error = fWEWrapper.updateColumnRecs(txnId, colStructList, colValueList, rowIDLists, tableRO.objnum);
error = fWEWrapper.updateColumnRecs(txnId, cscColTypeList, colStructList, colValueList, rowIDLists, tableRO.objnum);
}
if (error != NO_ERROR)
@ -4116,13 +4120,14 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs,
for (uint32_t j = 0; j < row.getColumnCount(); j++)
{
preBlkNums[j] = -1;
colWidth[j] = (row.getColumnWidth(j) >= 8 ? 8 : row.getColumnWidth(j));
colWidth[j] = (row.getColumnWidth(j) >= 16 ? 16 : row.getColumnWidth(j));
}
//Get the file information from rowgroup
dbRoot = rowGroups[txnId]->getDBRoot();
rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
WriteEngine::ColStructList colStructList;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColStruct colStruct;
colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
@ -4158,7 +4163,9 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs,
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
if (colType.colWidth > 8) //token
if (colType.colWidth > 8 &&
!(colType.colDataType == CalpontSystemCatalog::DECIMAL ||
colType.colDataType == CalpontSystemCatalog::UDECIMAL)) //token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
@ -4170,7 +4177,8 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs,
colStruct.colDataType = colType.colDataType;
colStructList.push_back( colStruct );
colStructList.push_back(colStruct);
cscColTypeList.push_back(colType);
}
}
catch (exception& ex)
@ -4181,13 +4189,15 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs,
}
std::vector<ColStructList> colExtentsStruct;
std::vector<CSCTypesList> colExtentsColType;
std::vector<void*> colOldValueList;
std::vector<RIDList> ridLists;
colExtentsStruct.push_back(colStructList);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(rowIDList);
int error = 0;
error = fWEWrapper.deleteRow( txnId, colExtentsStruct, colOldValueList, ridLists, roPair.objnum );
error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists, roPair.objnum);
if (error != NO_ERROR)
{