1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-05 16:15:50 +03:00

MCOL-1446 CS sorting direction aligned with the server`s.

This commit is contained in:
Roman Nozdrin
2018-05-23 22:31:21 +03:00
committed by Roman Nozdrin
parent 69138ecf32
commit 77b52a6a32
2 changed files with 57 additions and 22 deletions

View File

@@ -171,15 +171,18 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
} }
} }
/*
* The f() copies top element from an ordered queue into a row group. It
* does this backwards to syncronise sorting orientation with the server.
* The top row from the queue goes last into the returned set.
*/
void LimitedOrderBy::finalize() void LimitedOrderBy::finalize()
{ {
queue<RGData> tempQueue;
if (fRowGroup.getRowCount() > 0) if (fRowGroup.getRowCount() > 0)
fDataQueue.push(fData); fDataQueue.push(fData);
// MCOL-1052 The removed check effectively disables sorting, if (fOrderByQueue.size() > 0)
// since fStart = 0 if there is no OFFSET;
if (true)
{ {
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize(); uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
fMemSize += newSize; fMemSize += newSize;
@@ -191,26 +194,48 @@ void LimitedOrderBy::finalize()
throw IDBExcept(fErrorCode); throw IDBExcept(fErrorCode);
} }
uint64_t offset = 0;
uint64_t i = 0;
list<RGData> tempRGDataList;
// Skip first LIMIT rows in the the RowGroup
if ( fCount <= fOrderByQueue.size() )
{
offset = fCount % fRowsPerRG;
if(!offset && fCount > 0)
offset = fRowsPerRG;
}
else
{
offset = fOrderByQueue.size() % fRowsPerRG;
if(!offset && fOrderByQueue.size() > 0)
offset = fRowsPerRG;
}
list<RGData>::iterator tempListIter = tempRGDataList.begin();
i = 0;
uint32_t rSize = fRow0.getSize();
uint64_t preLastRowNumb = fRowsPerRG - 1;
fData.reinit(fRowGroup, fRowsPerRG); fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData); fRowGroup.setData(&fData);
fRowGroup.resetRowGroup(0); fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow0); offset = offset != 0 ? offset - 1 : offset;
queue<RGData> tempQueue; fRowGroup.getRow(offset, &fRow0);
uint64_t i = 0;
while ((fOrderByQueue.size() > fStart) && (i++ < fCount)) while ((fOrderByQueue.size() > fStart) && (i++ < fCount))
{ {
const OrderByRow& topRow = fOrderByQueue.top(); const OrderByRow& topRow = fOrderByQueue.top();
row1.setData(topRow.fData); row1.setData(topRow.fData);
copyRow(row1, &fRow0); copyRow(row1, &fRow0);
//memcpy(fRow0.getData(), topRow.fData, fRow0.getSize());
fRowGroup.incRowCount(); fRowGroup.incRowCount();
fRow0.nextRow(); offset--;
fRow0.prevRow(rSize);
fOrderByQueue.pop(); fOrderByQueue.pop();
if (fRowGroup.getRowCount() >= fRowsPerRG) if(offset == (uint64_t)-1)
{ {
tempQueue.push(fData); tempRGDataList.push_front(fData);
fMemSize += newSize; fMemSize += newSize;
if (!fRm->getMemory(newSize, fSessionMemLimit)) if (!fRm->getMemory(newSize, fSessionMemLimit))
@@ -221,15 +246,18 @@ void LimitedOrderBy::finalize()
} }
fData.reinit(fRowGroup, fRowsPerRG); fData.reinit(fRowGroup, fRowsPerRG);
//fData.reset(new uint8_t[fRowGroup.getDataSize(fRowsPerRG)]);
fRowGroup.setData(&fData); fRowGroup.setData(&fData);
fRowGroup.resetRowGroup(0); fRowGroup.resetRowGroup(0); // ?
fRowGroup.getRow(0, &fRow0); fRowGroup.getRow(preLastRowNumb, &fRow0);
offset = preLastRowNumb;
} }
} }
// Push the last/only group into the queue.
if (fRowGroup.getRowCount() > 0) if (fRowGroup.getRowCount() > 0)
tempQueue.push(fData); tempRGDataList.push_front(fData);
for(tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++)
tempQueue.push(*tempListIter);
fDataQueue = tempQueue; fDataQueue = tempQueue;
} }

View File

@@ -355,6 +355,7 @@ public:
*/ */
template<int len> void setUintField_offset(uint64_t val, uint32_t offset); template<int len> void setUintField_offset(uint64_t val, uint32_t offset);
inline void nextRow(uint32_t size); inline void nextRow(uint32_t size);
inline void prevRow(uint32_t size, uint64_t number);
inline void setUintField(uint64_t val, uint32_t colIndex); inline void setUintField(uint64_t val, uint32_t colIndex);
template<int len> void setIntField(int64_t, uint32_t colIndex); template<int len> void setIntField(int64_t, uint32_t colIndex);
@@ -896,6 +897,12 @@ inline void Row::nextRow(uint32_t size)
data += size; data += size;
} }
inline void Row::prevRow(uint32_t size, uint64_t number = 1)
{
data -= size * number;
}
template<int len> template<int len>
inline void Row::setUintField(uint64_t val, uint32_t colIndex) inline void Row::setUintField(uint64_t val, uint32_t colIndex)
{ {