You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			306 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			306 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //  $Id: limitedorderby.cpp 9581 2013-05-31 13:46:14Z pleblanc $
 | |
| 
 | |
| #include <iostream>
 | |
| // #define NDEBUG
 | |
| #include <cassert>
 | |
| #include <string>
 | |
| using namespace std;
 | |
| 
 | |
| #include "errorids.h"
 | |
| #include "exceptclasses.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "rowgroup.h"
 | |
| using namespace rowgroup;
 | |
| 
 | |
| #include "jlf_common.h"
 | |
| #include "limitedorderby.h"
 | |
| 
 | |
| using namespace ordering;
 | |
| 
 | |
| namespace joblist
 | |
| {
 | |
| const uint64_t LimitedOrderBy::fMaxUncommited = 102400;  // 100KiB - make it configurable?
 | |
| 
 | |
| // LimitedOrderBy class implementation
 | |
| LimitedOrderBy::LimitedOrderBy() : fStart(0), fCount(-1), fUncommitedMemory(0)
 | |
| {
 | |
|   fRule.fIdbCompare = this;
 | |
| }
 | |
| 
 | |
| LimitedOrderBy::~LimitedOrderBy()
 | |
| {
 | |
| }
 | |
| 
 | |
| void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo, bool invertRules,
 | |
|                                 bool isMultiThreaded)
 | |
| {
 | |
|   fRm = jobInfo.rm;
 | |
|   fSessionMemLimit = jobInfo.umMemLimit;
 | |
|   fErrorCode = ERR_ORDERBY_TOO_BIG;
 | |
| 
 | |
|   // locate column position in the rowgroup
 | |
|   map<uint32_t, uint32_t> keyToIndexMap;
 | |
| 
 | |
|   for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
 | |
|   {
 | |
|     if (keyToIndexMap.find(rg.getKeys()[i]) == keyToIndexMap.end())
 | |
|       keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
 | |
|   }
 | |
| 
 | |
|   vector<pair<uint32_t, bool> >::const_iterator i = jobInfo.orderByColVec.begin();
 | |
| 
 | |
|   for (; i != jobInfo.orderByColVec.end(); i++)
 | |
|   {
 | |
|     map<uint32_t, uint32_t>::iterator j = keyToIndexMap.find(i->first);
 | |
|     idbassert(j != keyToIndexMap.end());
 | |
| 
 | |
|     fOrderByCond.push_back(IdbSortSpec(j->second, i->second ^ invertRules));
 | |
|   }
 | |
| 
 | |
|   // limit row count info
 | |
|   if (isMultiThreaded)
 | |
|   {
 | |
|     // CS can't apply offset at the first stage
 | |
|     // otherwise it looses records.
 | |
|     fStart = 0;
 | |
|     fCount = jobInfo.limitStart + jobInfo.limitCount;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     fStart = jobInfo.limitStart;
 | |
|     fCount = jobInfo.limitCount;
 | |
|   }
 | |
| 
 | |
|   IdbOrderBy::initialize(rg);
 | |
| }
 | |
| 
 | |
| // This must return a proper number of key columns and
 | |
| // not just a column count.
 | |
| uint64_t LimitedOrderBy::getKeyLength() const
 | |
| {
 | |
|   return fRow0.getColumnCount();
 | |
| }
 | |
| 
 | |
| void LimitedOrderBy::processRow(const rowgroup::Row& row)
 | |
| {
 | |
|   // check if this is a distinct row
 | |
|   if (fDistinct && fDistinctMap->find(row.getPointer()) != fDistinctMap->end())
 | |
|     return;
 | |
| 
 | |
|   // @bug5312, limit count is 0, do nothing.
 | |
|   if (fCount == 0)
 | |
|     return;
 | |
| 
 | |
|   auto& orderByQueue = getQueue();
 | |
|   // if the row count is less than the limit
 | |
|   if (orderByQueue.size() < fStart + fCount)
 | |
|   {
 | |
|     copyRow(row, &fRow0);
 | |
|     OrderByRow newRow(fRow0, fRule);
 | |
|     orderByQueue.push(newRow);
 | |
| 
 | |
|     uint64_t memSizeInc = sizeof(newRow);
 | |
|     fUncommitedMemory += memSizeInc;
 | |
|     if (fUncommitedMemory >= fMaxUncommited)
 | |
|     {
 | |
|       if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
 | |
|       {
 | |
|         cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
 | |
|         throw IDBExcept(fErrorCode);
 | |
|       }
 | |
|       fMemSize += fUncommitedMemory;
 | |
|       fUncommitedMemory = 0;
 | |
|     }
 | |
| 
 | |
|     // add to the distinct map
 | |
|     if (fDistinct)
 | |
|       fDistinctMap->insert(fRow0.getPointer());
 | |
| 
 | |
|     fRowGroup.incRowCount();
 | |
|     fRow0.nextRow();
 | |
| 
 | |
|     if (fRowGroup.getRowCount() >= fRowsPerRG)
 | |
|     {
 | |
|       fDataQueue.push(fData);
 | |
|       uint64_t newSize = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
 | |
| 
 | |
|       if (!fRm->getMemory(newSize, fSessionMemLimit))
 | |
|       {
 | |
|         cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
 | |
|         throw IDBExcept(fErrorCode);
 | |
|       }
 | |
|       fMemSize += newSize;
 | |
| 
 | |
|       fData.reinit(fRowGroup, fRowsPerRG);
 | |
|       fRowGroup.setData(&fData);
 | |
|       fRowGroup.resetRowGroup(0);
 | |
|       fRowGroup.getRow(0, &fRow0);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData))
 | |
|   {
 | |
|     OrderByRow swapRow = orderByQueue.top();
 | |
|     row1.setData(swapRow.fData);
 | |
|     copyRow(row, &row1);
 | |
| 
 | |
|     if (fDistinct)
 | |
|     {
 | |
|       fDistinctMap->erase(orderByQueue.top().fData);
 | |
|       fDistinctMap->insert(row1.getPointer());
 | |
|     }
 | |
| 
 | |
|     orderByQueue.pop();
 | |
|     orderByQueue.push(swapRow);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * The f() copies top element from an ordered queue into a row group. It
 | |
|  * does this backwards to syncronise sorting orientation with the server.
 | |
|  * The top row from the queue goes last into the returned set.
 | |
|  */
 | |
| void LimitedOrderBy::finalize()
 | |
| {
 | |
|   if (fUncommitedMemory > 0)
 | |
|   {
 | |
|     if (!fRm->getMemory(fUncommitedMemory, fSessionMemLimit))
 | |
|     {
 | |
|       cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
 | |
|       throw IDBExcept(fErrorCode);
 | |
|     }
 | |
|     fMemSize += fUncommitedMemory;
 | |
|     fUncommitedMemory = 0;
 | |
|   }
 | |
| 
 | |
|   queue<RGData> tempQueue;
 | |
|   if (fRowGroup.getRowCount() > 0)
 | |
|     fDataQueue.push(fData);
 | |
| 
 | |
|   auto& orderByQueue = getQueue();
 | |
| 
 | |
|   if (orderByQueue.size() > 0)
 | |
|   {
 | |
|     // *DRRTUY Very memory intensive. CS needs to account active
 | |
|     // memory only and release memory if needed.
 | |
|     uint64_t memSizeInc = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
 | |
| 
 | |
|     if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
 | |
|     {
 | |
|       cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
 | |
|       throw IDBExcept(fErrorCode);
 | |
|     }
 | |
|     fMemSize += memSizeInc;
 | |
| 
 | |
|     uint64_t offset = 0;
 | |
|     uint64_t i = 0;
 | |
|     // Reduce queue size by an offset value if it applicable.
 | |
|     uint64_t queueSizeWoOffset = orderByQueue.size() > fStart ? orderByQueue.size() - fStart : 0;
 | |
|     list<RGData> tempRGDataList;
 | |
| 
 | |
|     if (fCount <= queueSizeWoOffset)
 | |
|     {
 | |
|       offset = fCount % fRowsPerRG;
 | |
|       if (!offset && fCount > 0)
 | |
|         offset = fRowsPerRG;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       offset = queueSizeWoOffset % fRowsPerRG;
 | |
|       if (!offset && queueSizeWoOffset > 0)
 | |
|         offset = fRowsPerRG;
 | |
|     }
 | |
| 
 | |
|     list<RGData>::iterator tempListIter = tempRGDataList.begin();
 | |
| 
 | |
|     i = 0;
 | |
|     uint32_t rSize = fRow0.getSize();
 | |
|     uint64_t preLastRowNumb = fRowsPerRG - 1;
 | |
|     fData.reinit(fRowGroup, fRowsPerRG);
 | |
|     fRowGroup.setData(&fData);
 | |
|     fRowGroup.resetRowGroup(0);
 | |
|     // *DRRTUY This approach won't work with
 | |
|     // OFSET > fRowsPerRG
 | |
|     offset = offset != 0 ? offset - 1 : offset;
 | |
|     fRowGroup.getRow(offset, &fRow0);
 | |
| 
 | |
|     while ((orderByQueue.size() > fStart) && (i++ < fCount))
 | |
|     {
 | |
|       const OrderByRow& topRow = orderByQueue.top();
 | |
|       row1.setData(topRow.fData);
 | |
|       copyRow(row1, &fRow0);
 | |
|       fRowGroup.incRowCount();
 | |
|       offset--;
 | |
|       fRow0.prevRow(rSize);
 | |
|       orderByQueue.pop();
 | |
| 
 | |
|       // if RG has fRowsPerRG rows
 | |
|       if (offset == (uint64_t)-1)
 | |
|       {
 | |
|         tempRGDataList.push_front(fData);
 | |
| 
 | |
|         if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
 | |
|         {
 | |
|           cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
 | |
|           throw IDBExcept(fErrorCode);
 | |
|         }
 | |
|         fMemSize += memSizeInc;
 | |
| 
 | |
|         fData.reinit(fRowGroup, fRowsPerRG);
 | |
|         fRowGroup.setData(&fData);
 | |
|         fRowGroup.resetRowGroup(0);  // ?
 | |
|         fRowGroup.getRow(preLastRowNumb, &fRow0);
 | |
|         offset = preLastRowNumb;
 | |
|       }
 | |
|     }
 | |
|     // Push the last/only group into the queue.
 | |
|     if (fRowGroup.getRowCount() > 0)
 | |
|       tempRGDataList.push_front(fData);
 | |
| 
 | |
|     for (tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++)
 | |
|       tempQueue.push(*tempListIter);
 | |
| 
 | |
|     fDataQueue = tempQueue;
 | |
|   }
 | |
| }
 | |
| 
 | |
| const string LimitedOrderBy::toString() const
 | |
| {
 | |
|   ostringstream oss;
 | |
|   oss << "OrderBy   cols: ";
 | |
|   vector<IdbSortSpec>::const_iterator i = fOrderByCond.begin();
 | |
| 
 | |
|   for (; i != fOrderByCond.end(); i++)
 | |
|     oss << "(" << i->fIndex << "," << ((i->fAsc) ? "Asc" : "Desc") << ","
 | |
|         << ((i->fNf) ? "null first" : "null last") << ") ";
 | |
| 
 | |
|   oss << " start-" << fStart << " count-" << fCount;
 | |
| 
 | |
|   if (fDistinct)
 | |
|     oss << " distinct";
 | |
| 
 | |
|   oss << endl;
 | |
| 
 | |
|   return oss.str();
 | |
| }
 | |
| 
 | |
| }  // namespace joblist
 |