You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1959 lines
		
	
	
		
			58 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1959 lines
		
	
	
		
			58 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
   Copyright (C) 2019 MariaDB Corporation
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
#include "tuplejoiner.h"
 | 
						|
#include <algorithm>
 | 
						|
#include <boost/thread/lock_types.hpp>
 | 
						|
#include <vector>
 | 
						|
#include <limits>
 | 
						|
#include <unordered_set>
 | 
						|
 | 
						|
#include "hasher.h"
 | 
						|
#include "lbidlist.h"
 | 
						|
#include "resourcemanager.h"
 | 
						|
#include "spinlock.h"
 | 
						|
#include "vlarray.h"
 | 
						|
#include "threadnaming.h"
 | 
						|
 | 
						|
using namespace std;
 | 
						|
using namespace rowgroup;
 | 
						|
using namespace utils;
 | 
						|
using namespace execplan;
 | 
						|
using namespace joblist;
 | 
						|
 | 
						|
namespace joiner
 | 
						|
{
 | 
						|
constexpr const size_t DEFAULT_BUCKET_COUNT = 10;
 | 
						|
 | 
						|
template <typename HashTable>
 | 
						|
std::unique_ptr<HashTable> makeHashMap(size_t bucketCount, ResourceManager* resourceManager)
 | 
						|
{
 | 
						|
  return std::unique_ptr<HashTable>(new HashTable(bucketCount, TupleJoiner::hasher(),
 | 
						|
                                                  typename HashTable::key_equal(),
 | 
						|
                                                  utils::STLPoolAllocator<typename HashTable::value_type>(resourceManager)));
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::initRowsVector()
 | 
						|
{
 | 
						|
  rows.reset(new RowPointersVec(resourceManager_->getAllocator<rowgroup::Row::Pointer>()));
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::initHashMaps(uint32_t& smallJoinColumn)
 | 
						|
{
 | 
						|
  if (typelessJoin)
 | 
						|
  {
 | 
						|
    for (size_t i = 0; i < bucketCount; i++)
 | 
						|
    {
 | 
						|
      ht.emplace_back(makeHashMap<typelesshash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
  {
 | 
						|
    for (size_t i = 0; i < bucketCount; i++)
 | 
						|
    {
 | 
						|
      ld.emplace_back(makeHashMap<ldhash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (smallRG.usesStringTable())
 | 
						|
  {
 | 
						|
    for (size_t i = 0; i < bucketCount; i++)
 | 
						|
    {
 | 
						|
      sth.emplace_back(makeHashMap<sthash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    for (size_t i = 0; i < bucketCount; i++)
 | 
						|
    {
 | 
						|
      h.emplace_back(makeHashMap<hash_t>(DEFAULT_BUCKET_COUNT, resourceManager_));
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
// Typed joiner ctor
 | 
						|
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
 | 
						|
                         uint32_t smallJoinColumn, uint32_t largeJoinColumn, JoinType jt,
 | 
						|
                         threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm,
 | 
						|
                         const uint64_t numCores)
 | 
						|
 : smallRG(smallInput)
 | 
						|
 , largeRG(largeInput)
 | 
						|
 , joinAlg(INSERTING)
 | 
						|
 , joinType(jt)
 | 
						|
 , threadCount(1)
 | 
						|
 , typelessJoin(false)
 | 
						|
 , bSignedUnsignedJoin(false)
 | 
						|
 , uniqueLimit(100)
 | 
						|
 , finished(false)
 | 
						|
 , numCores(numCores)
 | 
						|
 , jobstepThreadPool(jsThreadPool)
 | 
						|
 , _convertToDiskJoin(false)
 | 
						|
 , resourceManager_(rm)
 | 
						|
{
 | 
						|
  initRowsVector();
 | 
						|
  getBucketCount();
 | 
						|
 | 
						|
  m_bucketLocks.reset(new boost::mutex[bucketCount]);
 | 
						|
 | 
						|
  initHashMaps(smallJoinColumn);
 | 
						|
 | 
						|
  smallRG.initRow(&smallNullRow);
 | 
						|
 | 
						|
  if (smallOuterJoin() || largeOuterJoin() || semiJoin() || antiJoin())
 | 
						|
  {
 | 
						|
    smallNullMemory = RGData(smallRG, 1);
 | 
						|
    smallRG.setData(&smallNullMemory);
 | 
						|
    smallRG.getRow(0, &smallNullRow);
 | 
						|
    smallNullRow.initToNull();
 | 
						|
  }
 | 
						|
 | 
						|
  smallKeyColumns.push_back(smallJoinColumn);
 | 
						|
  largeKeyColumns.push_back(largeJoinColumn);
 | 
						|
  discreteValues.reset(new bool[1]);
 | 
						|
  cpValues.reset(new vector<int128_t>[1]);
 | 
						|
  discreteValues[0] = false;
 | 
						|
 | 
						|
  if (smallRG.isUnsigned(smallKeyColumns[0]))
 | 
						|
  {
 | 
						|
    if (datatypes::isWideDecimalType(smallRG.getColType(smallKeyColumns[0]),
 | 
						|
                                     smallRG.getColumnWidth(smallKeyColumns[0])))
 | 
						|
    {
 | 
						|
      cpValues[0].push_back((int128_t)-1);
 | 
						|
      cpValues[0].push_back(0);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      cpValues[0].push_back((int128_t)numeric_limits<uint64_t>::max());
 | 
						|
      cpValues[0].push_back(0);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    if (datatypes::isWideDecimalType(smallRG.getColType(smallKeyColumns[0]),
 | 
						|
                                     smallRG.getColumnWidth(smallKeyColumns[0])))
 | 
						|
    {
 | 
						|
      cpValues[0].push_back(utils::maxInt128);
 | 
						|
      cpValues[0].push_back(utils::minInt128);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      cpValues[0].push_back((int128_t)numeric_limits<int64_t>::max());
 | 
						|
      cpValues[0].push_back((int128_t)numeric_limits<int64_t>::min());
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (smallRG.isUnsigned(smallJoinColumn) != largeRG.isUnsigned(largeJoinColumn))
 | 
						|
    bSignedUnsignedJoin = true;
 | 
						|
 | 
						|
  nullValueForJoinColumn = smallNullRow.getSignedNullValue(smallJoinColumn);
 | 
						|
}
 | 
						|
 | 
						|
// Typeless joiner ctor
 | 
						|
TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput,
 | 
						|
                         const vector<uint32_t>& smallJoinColumns, const vector<uint32_t>& largeJoinColumns,
 | 
						|
                         JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm,
 | 
						|
                         const uint64_t numCores)
 | 
						|
 : smallRG(smallInput)
 | 
						|
 , largeRG(largeInput)
 | 
						|
 , joinAlg(INSERTING)
 | 
						|
 , joinType(jt)
 | 
						|
 , threadCount(1)
 | 
						|
 , typelessJoin(true)
 | 
						|
 , smallKeyColumns(smallJoinColumns)
 | 
						|
 , largeKeyColumns(largeJoinColumns)
 | 
						|
 , bSignedUnsignedJoin(false)
 | 
						|
 , uniqueLimit(100)
 | 
						|
 , finished(false)
 | 
						|
 , numCores(numCores)
 | 
						|
 , jobstepThreadPool(jsThreadPool)
 | 
						|
 , _convertToDiskJoin(false)
 | 
						|
 , resourceManager_(rm)
 | 
						|
{
 | 
						|
  uint i;
 | 
						|
 | 
						|
  initRowsVector();
 | 
						|
  getBucketCount();
 | 
						|
 | 
						|
  uint32_t unused = 0;
 | 
						|
  // Unused b/c this is a typeless joiner
 | 
						|
  initHashMaps(unused);
 | 
						|
 | 
						|
  m_bucketLocks.reset(new boost::mutex[bucketCount]);
 | 
						|
 | 
						|
  smallRG.initRow(&smallNullRow);
 | 
						|
 | 
						|
  if (smallOuterJoin() || largeOuterJoin() || semiJoin() || antiJoin())
 | 
						|
  {
 | 
						|
    smallNullMemory = RGData(smallRG, 1);
 | 
						|
    smallRG.setData(&smallNullMemory);
 | 
						|
    smallRG.getRow(0, &smallNullRow);
 | 
						|
    smallNullRow.initToNull();
 | 
						|
  }
 | 
						|
 | 
						|
  keyLength = calculateKeyLength(smallKeyColumns, smallRG, &largeKeyColumns, &largeRG);
 | 
						|
 | 
						|
  discreteValues.reset(new bool[smallKeyColumns.size()]);
 | 
						|
  cpValues.reset(new vector<int128_t>[smallKeyColumns.size()]);
 | 
						|
 | 
						|
  for (i = 0; i < smallKeyColumns.size(); ++i)
 | 
						|
  {
 | 
						|
    uint32_t smallKeyColumnsIdx = smallKeyColumns[i];
 | 
						|
    auto smallSideColType = smallRG.getColTypes()[smallKeyColumnsIdx];
 | 
						|
    // Set bSignedUnsignedJoin if one or more join columns are signed to unsigned compares.
 | 
						|
    if (smallRG.isUnsigned(smallKeyColumnsIdx) != largeRG.isUnsigned(largeKeyColumns[i]))
 | 
						|
    {
 | 
						|
      bSignedUnsignedJoin = true;
 | 
						|
    }
 | 
						|
 | 
						|
    discreteValues[i] = false;
 | 
						|
    if (isUnsigned(smallSideColType))
 | 
						|
    {
 | 
						|
      cpValues[i].push_back((int128_t)numeric_limits<uint64_t>::max());
 | 
						|
      cpValues[i].push_back(0);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      if (datatypes::isWideDecimalType(smallSideColType, smallRG.getColumnWidth(smallKeyColumnsIdx)))
 | 
						|
      {
 | 
						|
        cpValues[i].push_back(utils::maxInt128);
 | 
						|
        cpValues[i].push_back(utils::minInt128);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        cpValues[i].push_back(numeric_limits<int64_t>::max());
 | 
						|
        cpValues[i].push_back(numeric_limits<int64_t>::min());
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // note, 'numcores' is implied by tuplehashjoin on calls to insertRGData().
 | 
						|
  // TODO: make it explicit to avoid future confusion.
 | 
						|
  for (i = 0; i < (uint)numCores; i++)
 | 
						|
  {
 | 
						|
    auto alloc = resourceManager_->getAllocator<utils::FixedAllocatorBufType>();
 | 
						|
    storedKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength));
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TupleJoiner::TupleJoiner()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
TupleJoiner::~TupleJoiner()
 | 
						|
{
 | 
						|
  smallNullMemory = RGData();
 | 
						|
}
 | 
						|
 | 
						|
bool TupleJoiner::operator<(const TupleJoiner& tj) const
 | 
						|
{
 | 
						|
  return size() < tj.size();
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::getBucketCount()
 | 
						|
{
 | 
						|
  bucketCount = (numCores == 1 ? 1 : (1 << (32 - __builtin_clz(numCores - 1))));
 | 
						|
  bucketMask = bucketCount - 1;
 | 
						|
}
 | 
						|
 | 
						|
template <typename buckets_t, typename hash_table_t>
 | 
						|
void TupleJoiner::bucketsToTables(buckets_t* buckets, hash_table_t& tables)
 | 
						|
{
 | 
						|
  uint i;
 | 
						|
 | 
						|
  bool done = false, wasProductive;
 | 
						|
  while (!done && !wasAborted_)
 | 
						|
  {
 | 
						|
    done = true;
 | 
						|
    wasProductive = false;
 | 
						|
    for (i = 0; i < bucketCount; i++)
 | 
						|
    {
 | 
						|
      if (buckets[i].empty())
 | 
						|
        continue;
 | 
						|
      {
 | 
						|
        boost::unique_lock<boost::mutex> lock(m_bucketLocks[i], boost::try_to_lock);
 | 
						|
        if (!lock.owns_lock())
 | 
						|
        {
 | 
						|
          done = false;
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
        tables[i]->insert(buckets[i].begin(), buckets[i].end());
 | 
						|
      }
 | 
						|
 | 
						|
      wasProductive = true;
 | 
						|
      buckets[i].clear();
 | 
						|
    }
 | 
						|
    // TODO use CV here instead of busy sleep
 | 
						|
    if (!done && !wasProductive)
 | 
						|
      ::usleep(1000 * numCores);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row& r)
 | 
						|
{
 | 
						|
  utils::VLArray<TypelessData> td(rowCount);
 | 
						|
  utils::VLArray<vector<pair<TypelessData, Row::Pointer>>> v(bucketCount);
 | 
						|
  uint i;
 | 
						|
  FixedAllocator* alloc = &storedKeyAlloc[threadID];
 | 
						|
 | 
						|
  for (i = 0; i < rowCount; i++, r.nextRow())
 | 
						|
  {
 | 
						|
    td[i] = makeTypelessKey(r, smallKeyColumns, keyLength, alloc, largeRG, largeKeyColumns);
 | 
						|
    if (td[i].len == 0)
 | 
						|
      continue;
 | 
						|
    uint bucket = bucketPicker((char*)td[i].data, td[i].len, bpSeed) & bucketMask;
 | 
						|
    v[bucket].emplace_back(pair<TypelessData, Row::Pointer>(td[i], r.getPointer()));
 | 
						|
  }
 | 
						|
  bucketsToTables(&v[0], ht);
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::um_insertLongDouble(uint rowCount, Row& r)
 | 
						|
{
 | 
						|
  utils::VLArray<vector<pair<long double, Row::Pointer>>> v(bucketCount);
 | 
						|
  uint i;
 | 
						|
  uint smallKeyColumn = smallKeyColumns[0];
 | 
						|
 | 
						|
  for (i = 0; i < rowCount; i++, r.nextRow())
 | 
						|
  {
 | 
						|
    long double smallKey = r.getLongDoubleField(smallKeyColumn);
 | 
						|
    uint bucket = bucketPicker((char*)&smallKey, 10, bpSeed) &
 | 
						|
                  bucketMask;  // change if we decide to support windows again
 | 
						|
    if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL))
 | 
						|
      v[bucket].emplace_back(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
 | 
						|
    else
 | 
						|
      v[bucket].emplace_back(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
 | 
						|
  }
 | 
						|
  bucketsToTables(&v[0], ld);
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::um_insertInlineRows(uint rowCount, Row& r)
 | 
						|
{
 | 
						|
  uint i;
 | 
						|
  int64_t smallKey;
 | 
						|
  utils::VLArray<vector<pair<int64_t, uint8_t*>>> v(bucketCount);
 | 
						|
  uint smallKeyColumn = smallKeyColumns[0];
 | 
						|
 | 
						|
  for (i = 0; i < rowCount; i++, r.nextRow())
 | 
						|
  {
 | 
						|
    if (!r.isUnsigned(smallKeyColumn))
 | 
						|
      smallKey = r.getIntField(smallKeyColumn);
 | 
						|
    else
 | 
						|
      smallKey = (int64_t)r.getUintField(smallKeyColumn);
 | 
						|
    uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask;
 | 
						|
    if (UNLIKELY(smallKey == nullValueForJoinColumn))
 | 
						|
      v[bucket].emplace_back(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData()));
 | 
						|
    else
 | 
						|
      v[bucket].emplace_back(pair<int64_t, uint8_t*>(smallKey, r.getData()));
 | 
						|
  }
 | 
						|
  bucketsToTables(&v[0], h);
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::um_insertStringTable(uint rowCount, Row& r)
 | 
						|
{
 | 
						|
  int64_t smallKey;
 | 
						|
  uint i;
 | 
						|
  utils::VLArray<vector<pair<int64_t, Row::Pointer>>> v(bucketCount);
 | 
						|
  uint smallKeyColumn = smallKeyColumns[0];
 | 
						|
 | 
						|
  for (i = 0; i < rowCount; i++, r.nextRow())
 | 
						|
  {
 | 
						|
    if (!r.isUnsigned(smallKeyColumn))
 | 
						|
      smallKey = r.getIntField(smallKeyColumn);
 | 
						|
    else
 | 
						|
      smallKey = (int64_t)r.getUintField(smallKeyColumn);
 | 
						|
    uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask;
 | 
						|
    if (UNLIKELY(smallKey == nullValueForJoinColumn))
 | 
						|
      v[bucket].emplace_back(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer()));
 | 
						|
    else
 | 
						|
      v[bucket].emplace_back(pair<int64_t, Row::Pointer>(smallKey, r.getPointer()));
 | 
						|
  }
 | 
						|
  bucketsToTables(&v[0], sth);
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::insertRGData(RowGroup& rg, uint threadID)
 | 
						|
{
 | 
						|
  uint i, rowCount;
 | 
						|
  Row r;
 | 
						|
 | 
						|
  rg.initRow(&r);
 | 
						|
  rowCount = rg.getRowCount();
 | 
						|
 | 
						|
  rg.getRow(0, &r);
 | 
						|
  {
 | 
						|
    boost::unique_lock<boost::mutex> lock(m_cpValuesLock);
 | 
						|
    for (i = 0; i < rowCount; i++, r.nextRow())
 | 
						|
    {
 | 
						|
      updateCPData(r);
 | 
						|
      r.zeroRid();
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  rg.getRow(0, &r);
 | 
						|
 | 
						|
  if (joinAlg == UM)
 | 
						|
  {
 | 
						|
    if (typelessJoin)
 | 
						|
      um_insertTypeless(threadID, rowCount, r);
 | 
						|
    else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
      um_insertLongDouble(rowCount, r);
 | 
						|
    else if (!smallRG.usesStringTable())
 | 
						|
      um_insertInlineRows(rowCount, r);
 | 
						|
    else
 | 
						|
      um_insertStringTable(rowCount, r);
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    // while in PM-join mode, inserting is single-threaded
 | 
						|
    for (i = 0; i < rowCount; i++, r.nextRow())
 | 
						|
      rows->push_back(r.getPointer());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::insert(Row& r, bool zeroTheRid)
 | 
						|
{
 | 
						|
  /* when doing a disk-based join, only the first iteration on the large side
 | 
						|
  will 'zeroTheRid'.  The successive iterations will need it unchanged. */
 | 
						|
  if (zeroTheRid)
 | 
						|
    r.zeroRid();
 | 
						|
 | 
						|
  updateCPData(r);
 | 
						|
 | 
						|
  if (joinAlg == UM)
 | 
						|
  {
 | 
						|
    if (typelessJoin)
 | 
						|
    {
 | 
						|
      TypelessData td =
 | 
						|
          makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc[0], largeRG, largeKeyColumns);
 | 
						|
      if (td.len > 0)
 | 
						|
      {
 | 
						|
        uint bucket = bucketPicker((char*)td.data, td.len, bpSeed) & bucketMask;
 | 
						|
        ht[bucket]->insert(pair<TypelessData, Row::Pointer>(td, r.getPointer()));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
    {
 | 
						|
      long double smallKey = r.getLongDoubleField(smallKeyColumns[0]);
 | 
						|
      uint bucket = bucketPicker((char*)&smallKey, 10, bpSeed) &
 | 
						|
                    bucketMask;  // change if we decide to support windows again
 | 
						|
      if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL))
 | 
						|
        ld[bucket]->insert(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
 | 
						|
      else
 | 
						|
        ld[bucket]->insert(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
 | 
						|
    }
 | 
						|
    else if (!smallRG.usesStringTable())
 | 
						|
    {
 | 
						|
      int64_t smallKey;
 | 
						|
 | 
						|
      if (!r.isUnsigned(smallKeyColumns[0]))
 | 
						|
        smallKey = r.getIntField(smallKeyColumns[0]);
 | 
						|
      else
 | 
						|
        smallKey = (int64_t)r.getUintField(smallKeyColumns[0]);
 | 
						|
      uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask;
 | 
						|
      if (UNLIKELY(smallKey == nullValueForJoinColumn))
 | 
						|
        h[bucket]->insert(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData()));
 | 
						|
      else
 | 
						|
        h[bucket]->insert(pair<int64_t, uint8_t*>(smallKey, r.getData()));  // Normal path for integers
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      int64_t smallKey;
 | 
						|
 | 
						|
      if (!r.isUnsigned(smallKeyColumns[0]))
 | 
						|
        smallKey = r.getIntField(smallKeyColumns[0]);
 | 
						|
      else
 | 
						|
        smallKey = (int64_t)r.getUintField(smallKeyColumns[0]);
 | 
						|
      uint bucket = bucketPicker((char*)&smallKey, sizeof(smallKey), bpSeed) & bucketMask;
 | 
						|
      if (UNLIKELY(smallKey == nullValueForJoinColumn))
 | 
						|
        sth[bucket]->insert(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer()));
 | 
						|
      else
 | 
						|
        sth[bucket]->insert(pair<int64_t, Row::Pointer>(smallKey, r.getPointer()));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
    rows->push_back(r.getPointer());
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uint32_t threadID,
 | 
						|
                        vector<Row::Pointer>* matches)
 | 
						|
{
 | 
						|
  uint32_t i;
 | 
						|
  bool isNull = hasNullJoinColumn(largeSideRow);
 | 
						|
  matches->clear();
 | 
						|
 | 
						|
  if (inPM())
 | 
						|
  {
 | 
						|
    vector<uint32_t>& v = pmJoinResults[threadID][largeRowIndex];
 | 
						|
    uint32_t size = v.size();
 | 
						|
 | 
						|
    for (i = 0; i < size; i++)
 | 
						|
      if (v[i] < rows->size())
 | 
						|
        matches->push_back((*rows)[v[i]]);
 | 
						|
 | 
						|
    if (UNLIKELY((semiJoin() || antiJoin()) && matches->size() == 0))
 | 
						|
      matches->push_back(smallNullRow.getPointer());
 | 
						|
  }
 | 
						|
  else if (LIKELY(!isNull))
 | 
						|
  {
 | 
						|
    if (UNLIKELY(typelessJoin))
 | 
						|
    {
 | 
						|
      TypelessData largeKey;
 | 
						|
      thIterator it;
 | 
						|
      pair<thIterator, thIterator> range;
 | 
						|
 | 
						|
      largeKey = makeTypelessKey(largeSideRow, largeKeyColumns, keyLength, &tmpKeyAlloc[threadID], smallRG,
 | 
						|
                                 smallKeyColumns);
 | 
						|
      if (largeKey.len == 0)
 | 
						|
        return;
 | 
						|
 | 
						|
      uint bucket = bucketPicker((char*)largeKey.data, largeKey.len, bpSeed) & bucketMask;
 | 
						|
      range = ht[bucket]->equal_range(largeKey);
 | 
						|
 | 
						|
      if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
 | 
						|
        return;
 | 
						|
 | 
						|
      for (; range.first != range.second; ++range.first)
 | 
						|
        matches->push_back(range.first->second);
 | 
						|
    }
 | 
						|
    else if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE && !ld.empty())
 | 
						|
    {
 | 
						|
      // This is a compare of two long double
 | 
						|
      long double largeKey;
 | 
						|
      ldIterator it;
 | 
						|
      pair<ldIterator, ldIterator> range;
 | 
						|
      Row r;
 | 
						|
 | 
						|
      largeKey = largeSideRow.getLongDoubleField(largeKeyColumns[0]);
 | 
						|
      uint bucket = bucketPicker((char*)&largeKey, 10, bpSeed) & bucketMask;
 | 
						|
      range = ld[bucket]->equal_range(largeKey);
 | 
						|
 | 
						|
      if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
 | 
						|
        return;
 | 
						|
      for (; range.first != range.second; ++range.first)
 | 
						|
      {
 | 
						|
        matches->push_back(range.first->second);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (!smallRG.usesStringTable())
 | 
						|
    {
 | 
						|
      int64_t largeKey;
 | 
						|
 | 
						|
      if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
      {
 | 
						|
        largeKey = (int64_t)largeSideRow.getLongDoubleField(largeKeyColumns[0]);
 | 
						|
      }
 | 
						|
      else if (largeSideRow.isUnsigned(largeKeyColumns[0]))
 | 
						|
      {
 | 
						|
        largeKey = (int64_t)largeSideRow.getUintField(largeKeyColumns[0]);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        largeKey = largeSideRow.getIntField(largeKeyColumns[0]);
 | 
						|
      }
 | 
						|
 | 
						|
      if (!ld.empty())
 | 
						|
      {
 | 
						|
        // Compare against long double
 | 
						|
        long double ldKey = largeKey;
 | 
						|
        uint bucket = bucketPicker((char*)&ldKey, 10, bpSeed) & bucketMask;
 | 
						|
        auto range = ld[bucket]->equal_range(ldKey);
 | 
						|
 | 
						|
        if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
 | 
						|
          return;
 | 
						|
 | 
						|
        for (; range.first != range.second; ++range.first)
 | 
						|
          matches->push_back(range.first->second);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        uint bucket = bucketPicker((char*)&largeKey, sizeof(largeKey), bpSeed) & bucketMask;
 | 
						|
        auto range = h[bucket]->equal_range(largeKey);
 | 
						|
 | 
						|
        if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
 | 
						|
          return;
 | 
						|
 | 
						|
        for (; range.first != range.second; ++range.first)
 | 
						|
          matches->emplace_back(rowgroup::Row::Pointer(range.first->second));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      int64_t largeKey = largeSideRow.getIntField(largeKeyColumns[0]);
 | 
						|
      uint bucket = bucketPicker((char*)&largeKey, sizeof(largeKey), bpSeed) & bucketMask;
 | 
						|
      auto range = sth[bucket]->equal_range(largeKey);
 | 
						|
 | 
						|
      if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
 | 
						|
        return;
 | 
						|
 | 
						|
      for (; range.first != range.second; ++range.first)
 | 
						|
        matches->push_back(range.first->second);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (UNLIKELY(largeOuterJoin() && matches->size() == 0))
 | 
						|
  {
 | 
						|
    // cout << "Matched the NULL row: " << smallNullRow.toString() << endl;
 | 
						|
    matches->push_back(smallNullRow.getPointer());
 | 
						|
  }
 | 
						|
 | 
						|
  if (UNLIKELY(inUM() && (joinType & MATCHNULLS) && !isNull && !typelessJoin))
 | 
						|
  {
 | 
						|
    if (largeRG.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE && !ld.empty())
 | 
						|
    {
 | 
						|
      uint bucket = bucketPicker((char*)&(joblist::LONGDOUBLENULL), sizeof(joblist::LONGDOUBLENULL), bpSeed) &
 | 
						|
                    bucketMask;
 | 
						|
      pair<ldIterator, ldIterator> range = ld[bucket]->equal_range(joblist::LONGDOUBLENULL);
 | 
						|
 | 
						|
      for (; range.first != range.second; ++range.first)
 | 
						|
        matches->push_back(range.first->second);
 | 
						|
    }
 | 
						|
    else if (!smallRG.usesStringTable())
 | 
						|
    {
 | 
						|
      auto nullVal = getJoinNullValue();
 | 
						|
      uint bucket = bucketPicker((char*)&nullVal, sizeof(nullVal), bpSeed) & bucketMask;
 | 
						|
      pair<iterator, iterator> range = h[bucket]->equal_range(nullVal);
 | 
						|
 | 
						|
      for (; range.first != range.second; ++range.first)
 | 
						|
        matches->emplace_back(rowgroup::Row::Pointer(range.first->second));
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      auto nullVal = getJoinNullValue();
 | 
						|
      uint bucket = bucketPicker((char*)&nullVal, sizeof(nullVal), bpSeed) & bucketMask;
 | 
						|
      pair<sthash_t::iterator, sthash_t::iterator> range = sth[bucket]->equal_range(nullVal);
 | 
						|
 | 
						|
      for (; range.first != range.second; ++range.first)
 | 
						|
        matches->push_back(range.first->second);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /* Bug 3524.  For 'not in' queries this matches everything.
 | 
						|
   */
 | 
						|
  if (UNLIKELY(inUM() && isNull && antiJoin() && (joinType & MATCHNULLS)))
 | 
						|
  {
 | 
						|
    if (!typelessJoin)
 | 
						|
    {
 | 
						|
      if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
      {
 | 
						|
        ldIterator it;
 | 
						|
 | 
						|
        for (uint i = 0; i < bucketCount; i++)
 | 
						|
          for (it = ld[i]->begin(); it != ld[i]->end(); ++it)
 | 
						|
            matches->push_back(it->second);
 | 
						|
      }
 | 
						|
      else if (!smallRG.usesStringTable())
 | 
						|
      {
 | 
						|
        iterator it;
 | 
						|
 | 
						|
        for (uint i = 0; i < bucketCount; i++)
 | 
						|
          for (it = h[i]->begin(); it != h[i]->end(); ++it)
 | 
						|
            matches->emplace_back(rowgroup::Row::Pointer(it->second));
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        sthash_t::iterator it;
 | 
						|
 | 
						|
        for (uint i = 0; i < bucketCount; i++)
 | 
						|
          for (it = sth[i]->begin(); it != sth[i]->end(); ++it)
 | 
						|
            matches->push_back(it->second);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      thIterator it;
 | 
						|
 | 
						|
      for (uint i = 0; i < bucketCount; i++)
 | 
						|
        for (it = ht[i]->begin(); it != ht[i]->end(); ++it)
 | 
						|
          matches->push_back(it->second);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
using unordered_set_int128 = std::unordered_set<int128_t, utils::Hash128, utils::Equal128>;
 | 
						|
 | 
						|
void TupleJoiner::doneInserting()
 | 
						|
{
 | 
						|
  // a minor textual cleanup
 | 
						|
#ifdef TJ_DEBUG
 | 
						|
#define CHECKSIZE                         \
 | 
						|
  if (uniquer.size() > uniqueLimit)       \
 | 
						|
  {                                       \
 | 
						|
    cout << "too many discrete values\n"; \
 | 
						|
    return;                               \
 | 
						|
  }
 | 
						|
#else
 | 
						|
#define CHECKSIZE                   \
 | 
						|
  if (uniquer.size() > uniqueLimit) \
 | 
						|
    return;
 | 
						|
#endif
 | 
						|
 | 
						|
  uint32_t col;
 | 
						|
 | 
						|
  /* Put together the discrete values for the runtime casual partitioning restriction */
 | 
						|
 | 
						|
  finished = true;
 | 
						|
 | 
						|
  for (col = 0; col < smallKeyColumns.size(); col++)
 | 
						|
  {
 | 
						|
    unordered_set_int128 uniquer;
 | 
						|
    unordered_set_int128::iterator uit;
 | 
						|
    sthash_t::iterator sthit;
 | 
						|
    hash_t::iterator hit;
 | 
						|
    ldhash_t::iterator ldit;
 | 
						|
    typelesshash_t::iterator thit;
 | 
						|
    uint32_t i, pmpos = 0, rowCount;
 | 
						|
    Row smallRow;
 | 
						|
    auto smallSideColIdx = smallKeyColumns[col];
 | 
						|
    auto smallSideColType = smallRG.getColType(smallSideColIdx);
 | 
						|
 | 
						|
    smallRG.initRow(&smallRow);
 | 
						|
 | 
						|
    if (smallRow.isCharType(smallSideColIdx))
 | 
						|
      continue;
 | 
						|
 | 
						|
    rowCount = size();
 | 
						|
 | 
						|
    uint bucket = 0;
 | 
						|
    if (joinAlg == PM)
 | 
						|
      pmpos = 0;
 | 
						|
    else if (typelessJoin)
 | 
						|
      thit = ht[bucket]->begin();
 | 
						|
    else if (isLongDouble(smallRG.getColType(smallKeyColumns[0])))
 | 
						|
      ldit = ld[bucket]->begin();
 | 
						|
    else if (!smallRG.usesStringTable())
 | 
						|
      hit = h[bucket]->begin();
 | 
						|
    else
 | 
						|
      sthit = sth[bucket]->begin();
 | 
						|
 | 
						|
    for (i = 0; i < rowCount; i++)
 | 
						|
    {
 | 
						|
      if (joinAlg == PM)
 | 
						|
        smallRow.setPointer((*rows)[pmpos++]);
 | 
						|
      else if (typelessJoin)
 | 
						|
      {
 | 
						|
        while (thit == ht[bucket]->end())
 | 
						|
          thit = ht[++bucket]->begin();
 | 
						|
        smallRow.setPointer(thit->second);
 | 
						|
        ++thit;
 | 
						|
      }
 | 
						|
      else if (isLongDouble(smallSideColType))
 | 
						|
      {
 | 
						|
        while (ldit == ld[bucket]->end())
 | 
						|
          ldit = ld[++bucket]->begin();
 | 
						|
        smallRow.setPointer(ldit->second);
 | 
						|
        ++ldit;
 | 
						|
      }
 | 
						|
      else if (!smallRG.usesStringTable())
 | 
						|
      {
 | 
						|
        while (hit == h[bucket]->end())
 | 
						|
          hit = h[++bucket]->begin();
 | 
						|
        smallRow.setPointer(rowgroup::Row::Pointer(hit->second));
 | 
						|
        ++hit;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        while (sthit == sth[bucket]->end())
 | 
						|
          sthit = sth[++bucket]->begin();
 | 
						|
        smallRow.setPointer(sthit->second);
 | 
						|
        ++sthit;
 | 
						|
      }
 | 
						|
 | 
						|
      if (isLongDouble(smallSideColType))
 | 
						|
      {
 | 
						|
        double dval = (double)roundl(smallRow.getLongDoubleField(smallSideColIdx));
 | 
						|
        switch (largeRG.getColType(largeKeyColumns[col]))
 | 
						|
        {
 | 
						|
          case CalpontSystemCatalog::DOUBLE:
 | 
						|
          case CalpontSystemCatalog::UDOUBLE:
 | 
						|
          case CalpontSystemCatalog::FLOAT:
 | 
						|
          case CalpontSystemCatalog::UFLOAT:
 | 
						|
          {
 | 
						|
            uniquer.insert(*(int64_t*)&dval);
 | 
						|
            break;
 | 
						|
          }
 | 
						|
          default:
 | 
						|
          {
 | 
						|
            uniquer.insert((int64_t)dval);
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
      else if (datatypes::isWideDecimalType(smallSideColType, smallRow.getColumnWidth(smallSideColIdx)))
 | 
						|
      {
 | 
						|
        uniquer.insert(smallRow.getTSInt128Field(smallSideColIdx).getValue());
 | 
						|
      }
 | 
						|
      else if (smallRow.isUnsigned(smallSideColIdx))
 | 
						|
      {
 | 
						|
        uniquer.insert((int64_t)smallRow.getUintField(smallSideColIdx));
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        uniquer.insert(smallRow.getIntField(smallSideColIdx));
 | 
						|
      }
 | 
						|
 | 
						|
      CHECKSIZE;
 | 
						|
    }
 | 
						|
 | 
						|
    discreteValues[col] = true;
 | 
						|
    cpValues[col].clear();
 | 
						|
#ifdef TJ_DEBUG
 | 
						|
    cout << "inserting " << uniquer.size() << " discrete values\n";
 | 
						|
#endif
 | 
						|
 | 
						|
    for (uit = uniquer.begin(); uit != uniquer.end(); ++uit)
 | 
						|
      cpValues[col].push_back(*uit);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setInPM()
 | 
						|
{
 | 
						|
  joinAlg = PM;
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::umJoinConvert(size_t begin, size_t end)
 | 
						|
{
 | 
						|
  utils::setThreadName("TJUMJoinConvert1");
 | 
						|
 | 
						|
  Row smallRow;
 | 
						|
  smallRG.initRow(&smallRow);
 | 
						|
 | 
						|
  while (begin < end)
 | 
						|
  {
 | 
						|
    smallRow.setPointer((*rows)[begin++]);
 | 
						|
    insert(smallRow);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setInUM()
 | 
						|
{
 | 
						|
  Row smallRow;
 | 
						|
  uint32_t i, size;
 | 
						|
 | 
						|
  if (joinAlg == UM)
 | 
						|
    return;
 | 
						|
 | 
						|
  joinAlg = UM;
 | 
						|
  size = rows->size();
 | 
						|
  size_t chunkSize =
 | 
						|
      ((size / numCores) + 1 < 50000 ? 50000
 | 
						|
                                     : (size / numCores) + 1);  // don't start a thread to process < 50k rows
 | 
						|
 | 
						|
  utils::VLArray<uint64_t> jobs(numCores);
 | 
						|
  i = 0;
 | 
						|
  for (size_t firstRow = 0; i < (uint)numCores && firstRow < size; i++, firstRow += chunkSize)
 | 
						|
    jobs[i] = jobstepThreadPool->invoke(
 | 
						|
        [this, firstRow, chunkSize, size]
 | 
						|
        { this->umJoinConvert(firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size)); });
 | 
						|
 | 
						|
  for (uint j = 0; j < i; j++)
 | 
						|
    jobstepThreadPool->join(jobs[j]);
 | 
						|
 | 
						|
#ifdef TJ_DEBUG
 | 
						|
  cout << "done\n";
 | 
						|
#endif
 | 
						|
  initRowsVector();
 | 
						|
 | 
						|
  if (typelessJoin)
 | 
						|
  {
 | 
						|
    for (i = 0; i < threadCount; i++)
 | 
						|
    {
 | 
						|
      auto alloc = resourceManager_->getAllocator<utils::FixedAllocatorBufType>();
 | 
						|
      tmpKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength, true));
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::umJoinConvert(uint threadID, vector<RGData>& rgs, size_t begin, size_t end)
 | 
						|
{
 | 
						|
  utils::setThreadName("TJUMJoinConvert2");
 | 
						|
 | 
						|
  RowGroup l_smallRG(smallRG);
 | 
						|
 | 
						|
  while (begin < end)
 | 
						|
  {
 | 
						|
    l_smallRG.setData(&(rgs[begin++]));
 | 
						|
    insertRGData(l_smallRG, threadID);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setInUM(vector<RGData>& rgs)
 | 
						|
{
 | 
						|
  Row smallRow;
 | 
						|
  uint32_t i, size;
 | 
						|
 | 
						|
  if (joinAlg == UM)
 | 
						|
    return;
 | 
						|
 | 
						|
  initRowsVector();
 | 
						|
 | 
						|
  joinAlg = UM;
 | 
						|
  size = rgs.size();
 | 
						|
  size_t chunkSize =
 | 
						|
      ((size / numCores) + 1 < 10 ? 10 : (size / numCores) + 1);  // don't issue jobs for < 10 rowgroups
 | 
						|
 | 
						|
  utils::VLArray<uint64_t> jobs(numCores);
 | 
						|
  i = 0;
 | 
						|
  for (size_t firstRow = 0; i < (uint)numCores && firstRow < size; i++, firstRow += chunkSize)
 | 
						|
    jobs[i] = jobstepThreadPool->invoke(
 | 
						|
        [this, firstRow, chunkSize, size, i, &rgs]
 | 
						|
        {
 | 
						|
          this->umJoinConvert(i, rgs, firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size));
 | 
						|
        });
 | 
						|
 | 
						|
  for (uint j = 0; j < i; j++)
 | 
						|
    jobstepThreadPool->join(jobs[j]);
 | 
						|
 | 
						|
#ifdef TJ_DEBUG
 | 
						|
  cout << "done\n";
 | 
						|
#endif
 | 
						|
 | 
						|
  if (typelessJoin)
 | 
						|
  {
 | 
						|
    for (i = 0; i < threadCount; i++)
 | 
						|
    {
 | 
						|
      auto alloc = resourceManager_->getAllocator<utils::FixedAllocatorBufType>();
 | 
						|
      tmpKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength, true));
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setPMJoinResults(std::shared_ptr<vector<uint32_t>[]> jr, uint32_t threadID)
 | 
						|
{
 | 
						|
  pmJoinResults[threadID] = jr;
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::markMatches(uint32_t threadID, uint32_t rowCount)
 | 
						|
{
 | 
						|
  std::shared_ptr<vector<uint32_t>[]> matches = pmJoinResults[threadID];
 | 
						|
  uint32_t i, j;
 | 
						|
 | 
						|
  for (i = 0; i < rowCount; i++)
 | 
						|
    for (j = 0; j < matches[i].size(); j++)
 | 
						|
    {
 | 
						|
      if (matches[i][j] < rows->size())
 | 
						|
      {
 | 
						|
        smallRow[threadID].setPointer((*rows)[matches[i][j]]);
 | 
						|
        smallRow[threadID].markRow();
 | 
						|
      }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::markMatches(uint32_t threadID, const vector<Row::Pointer>& matches)
 | 
						|
{
 | 
						|
  uint32_t rowCount = matches.size();
 | 
						|
  uint32_t i;
 | 
						|
 | 
						|
  for (i = 0; i < rowCount; i++)
 | 
						|
  {
 | 
						|
    smallRow[threadID].setPointer(matches[i]);
 | 
						|
    smallRow[threadID].markRow();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
std::shared_ptr<std::vector<uint32_t>[]> TupleJoiner::getPMJoinArrays(uint32_t threadID)
 | 
						|
{
 | 
						|
  return pmJoinResults[threadID];
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setThreadCount(uint32_t cnt)
 | 
						|
{
 | 
						|
  threadCount = cnt;
 | 
						|
  pmJoinResults.reset(new std::shared_ptr<vector<uint32_t>[]>[cnt]);
 | 
						|
  smallRow.reset(new Row[cnt]);
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < cnt; i++)
 | 
						|
    smallRG.initRow(&smallRow[i]);
 | 
						|
 | 
						|
  if (typelessJoin)
 | 
						|
  {
 | 
						|
    for (uint32_t i = 0; i < threadCount; i++)
 | 
						|
    {
 | 
						|
      auto alloc = resourceManager_->getAllocator<utils::FixedAllocatorBufType>();
 | 
						|
      tmpKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength, true));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (fe)
 | 
						|
  {
 | 
						|
    fes.reset(new funcexp::FuncExpWrapper[cnt]);
 | 
						|
 | 
						|
    for (uint32_t i = 0; i < cnt; i++)
 | 
						|
      fes[i] = *fe;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
 | 
						|
{
 | 
						|
  Row smallR;
 | 
						|
 | 
						|
  smallRG.initRow(&smallR);
 | 
						|
  out->clear();
 | 
						|
 | 
						|
  if (inPM())
 | 
						|
  {
 | 
						|
    uint32_t i, size;
 | 
						|
 | 
						|
    size = rows->size();
 | 
						|
 | 
						|
    for (i = 0; i < size; i++)
 | 
						|
    {
 | 
						|
      smallR.setPointer((*rows)[i]);
 | 
						|
 | 
						|
      if (!smallR.isMarked())
 | 
						|
        out->push_back((*rows)[i]);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    if (typelessJoin)
 | 
						|
    {
 | 
						|
      typelesshash_t::iterator it;
 | 
						|
 | 
						|
      for (uint i = 0; i < bucketCount; i++)
 | 
						|
        for (it = ht[i]->begin(); it != ht[i]->end(); ++it)
 | 
						|
        {
 | 
						|
          smallR.setPointer(it->second);
 | 
						|
 | 
						|
          if (!smallR.isMarked())
 | 
						|
            out->push_back(it->second);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
    {
 | 
						|
      ldIterator it;
 | 
						|
 | 
						|
      for (uint i = 0; i < bucketCount; i++)
 | 
						|
        for (it = ld[i]->begin(); it != ld[i]->end(); ++it)
 | 
						|
        {
 | 
						|
          smallR.setPointer(it->second);
 | 
						|
 | 
						|
          if (!smallR.isMarked())
 | 
						|
            out->push_back(it->second);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else if (!smallRG.usesStringTable())
 | 
						|
    {
 | 
						|
      iterator it;
 | 
						|
 | 
						|
      for (uint i = 0; i < bucketCount; i++)
 | 
						|
        for (it = h[i]->begin(); it != h[i]->end(); ++it)
 | 
						|
        {
 | 
						|
          smallR.setPointer(rowgroup::Row::Pointer(it->second));
 | 
						|
 | 
						|
          if (!smallR.isMarked())
 | 
						|
            out->emplace_back(rowgroup::Row::Pointer(it->second));
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      sthash_t::iterator it;
 | 
						|
 | 
						|
      for (uint i = 0; i < bucketCount; i++)
 | 
						|
        for (it = sth[i]->begin(); it != sth[i]->end(); ++it)
 | 
						|
        {
 | 
						|
          smallR.setPointer(it->second);
 | 
						|
 | 
						|
          if (!smallR.isMarked())
 | 
						|
            out->push_back(it->second);
 | 
						|
        }
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setFcnExpFilter(boost::shared_ptr<funcexp::FuncExpWrapper> pt)
 | 
						|
{
 | 
						|
  fe = pt;
 | 
						|
 | 
						|
  if (fe)
 | 
						|
    joinType |= WITHFCNEXP;
 | 
						|
  else
 | 
						|
    joinType &= ~WITHFCNEXP;
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::updateCPData(const Row& r)
 | 
						|
{
 | 
						|
  uint32_t col;
 | 
						|
 | 
						|
  if (antiJoin() || largeOuterJoin())
 | 
						|
    return;
 | 
						|
 | 
						|
  for (col = 0; col < smallKeyColumns.size(); col++)
 | 
						|
  {
 | 
						|
    auto colIdx = smallKeyColumns[col];
 | 
						|
    if (r.isLongString(colIdx))
 | 
						|
      continue;
 | 
						|
 | 
						|
    auto &min = cpValues[col][0], &max = cpValues[col][1];
 | 
						|
 | 
						|
    if (r.isCharType(colIdx))
 | 
						|
    {
 | 
						|
      datatypes::Charset cs(r.getCharset(colIdx));
 | 
						|
      int64_t val = r.getIntField(colIdx);
 | 
						|
 | 
						|
      if (datatypes::TCharShort::strnncollsp(cs, val, min, r.getColumnWidth(smallKeyColumns[col])) < 0 ||
 | 
						|
          ((int64_t)min) == numeric_limits<int64_t>::max())
 | 
						|
      {
 | 
						|
        min = val;
 | 
						|
      }
 | 
						|
 | 
						|
      if (datatypes::TCharShort::strnncollsp(cs, val, max, r.getColumnWidth(smallKeyColumns[col])) > 0 ||
 | 
						|
          ((int64_t)max) == numeric_limits<int64_t>::min())
 | 
						|
      {
 | 
						|
        max = val;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (r.isUnsigned(colIdx))
 | 
						|
    {
 | 
						|
      uint128_t uval;
 | 
						|
 | 
						|
      if (r.getColType(colIdx) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
      {
 | 
						|
        double dval = (double)roundl(r.getLongDoubleField(smallKeyColumns[col]));
 | 
						|
        switch (largeRG.getColType(largeKeyColumns[col]))
 | 
						|
        {
 | 
						|
          case CalpontSystemCatalog::DOUBLE:
 | 
						|
          case CalpontSystemCatalog::UDOUBLE:
 | 
						|
          case CalpontSystemCatalog::FLOAT:
 | 
						|
          case CalpontSystemCatalog::UFLOAT:
 | 
						|
          {
 | 
						|
            uval = *(uint64_t*)&dval;
 | 
						|
            break;
 | 
						|
          }
 | 
						|
          default:
 | 
						|
          {
 | 
						|
            uval = (uint64_t)dval;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
      else if (datatypes::isWideDecimalType(r.getColType(colIdx), r.getColumnWidth(colIdx)))
 | 
						|
      {
 | 
						|
        uval = r.getTSInt128Field(colIdx).getValue();
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        uval = r.getUintField(colIdx);
 | 
						|
      }
 | 
						|
 | 
						|
      if (uval > static_cast<uint128_t>(max))
 | 
						|
        max = static_cast<int128_t>(uval);
 | 
						|
 | 
						|
      if (uval < static_cast<uint128_t>(min))
 | 
						|
        min = static_cast<int128_t>(uval);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      int128_t val = 0;
 | 
						|
 | 
						|
      if (r.getColType(colIdx) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
      {
 | 
						|
        double dval = (double)roundl(r.getLongDoubleField(colIdx));
 | 
						|
        switch (largeRG.getColType(largeKeyColumns[col]))
 | 
						|
        {
 | 
						|
          case CalpontSystemCatalog::DOUBLE:
 | 
						|
          case CalpontSystemCatalog::UDOUBLE:
 | 
						|
          case CalpontSystemCatalog::FLOAT:
 | 
						|
          case CalpontSystemCatalog::UFLOAT:
 | 
						|
          {
 | 
						|
            val = *(int64_t*)&dval;
 | 
						|
            break;
 | 
						|
          }
 | 
						|
          default:
 | 
						|
          {
 | 
						|
            val = (int64_t)dval;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
      else if (datatypes::isWideDecimalType(r.getColType(colIdx), r.getColumnWidth(colIdx)))
 | 
						|
      {
 | 
						|
        val = r.getTSInt128Field(colIdx).getValue();
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        val = r.getIntField(colIdx);
 | 
						|
      }
 | 
						|
 | 
						|
      if (val > max)
 | 
						|
        max = val;
 | 
						|
 | 
						|
      if (val < min)
 | 
						|
        min = val;
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
size_t TupleJoiner::size() const
 | 
						|
{
 | 
						|
  if (joinAlg == UM || joinAlg == INSERTING)
 | 
						|
  {
 | 
						|
    size_t ret = 0;
 | 
						|
    for (uint i = 0; i < bucketCount; i++)
 | 
						|
      if (UNLIKELY(typelessJoin))
 | 
						|
        ret += ht[i]->size();
 | 
						|
      else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
        ret += ld[i]->size();
 | 
						|
      else if (!smallRG.usesStringTable())
 | 
						|
        ret += h[i]->size();
 | 
						|
      else
 | 
						|
        ret += sth[i]->size();
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  return rows->size();
 | 
						|
}
 | 
						|
 | 
						|
class TypelessDataStringEncoder
 | 
						|
{
 | 
						|
  const uint8_t* mStr;
 | 
						|
  uint32_t mLength;
 | 
						|
 | 
						|
 public:
 | 
						|
  TypelessDataStringEncoder(const uint8_t* str, uint32_t length) : mStr(str), mLength(length)
 | 
						|
  {
 | 
						|
  }
 | 
						|
  TypelessDataStringEncoder(const utils::ConstString& str)
 | 
						|
   : mStr((const uint8_t*)str.str()), mLength(str.length())
 | 
						|
  {
 | 
						|
  }
 | 
						|
  bool store(uint8_t* to, uint32_t& off, uint32_t keylen) const
 | 
						|
  {
 | 
						|
    if (mLength > 0xFFFF)  // We encode length into two bytes below
 | 
						|
    {
 | 
						|
      throw runtime_error("Cannot join strings greater than 64KB");
 | 
						|
    }
 | 
						|
 | 
						|
    if (off + mLength + 2 > keylen)
 | 
						|
      return true;
 | 
						|
 | 
						|
    to[off++] = mLength / 0xFF;
 | 
						|
    to[off++] = mLength % 0xFF;
 | 
						|
    /*
 | 
						|
      QQ: perhaps now when we put length,
 | 
						|
      we don't need to stop at '\0' bytes any more.
 | 
						|
      If so, the loop below can be replace to memcpy().
 | 
						|
    */
 | 
						|
    for (uint32_t j = 0; j < mLength && mStr[j] != 0; j++)
 | 
						|
    {
 | 
						|
      if (off >= keylen)
 | 
						|
        return true;
 | 
						|
      to[off++] = mStr[j];
 | 
						|
    }
 | 
						|
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
class WideDecimalKeyConverter
 | 
						|
{
 | 
						|
  const Row* mR;
 | 
						|
  uint64_t convertedValue;
 | 
						|
  const uint32_t mKeyColId;
 | 
						|
  uint16_t width;
 | 
						|
 | 
						|
 public:
 | 
						|
  WideDecimalKeyConverter(const Row& r, const uint32_t keyColId)
 | 
						|
   : mR(&r), mKeyColId(keyColId), width(datatypes::MAXDECIMALWIDTH)
 | 
						|
  {
 | 
						|
  }
 | 
						|
  bool isConvertedToSmallSideType() const
 | 
						|
  {
 | 
						|
    return width == datatypes::MAXLEGACYWIDTH;
 | 
						|
  }
 | 
						|
  int64_t getConvertedTInt64() const
 | 
						|
  {
 | 
						|
    return (int64_t)convertedValue;
 | 
						|
  }
 | 
						|
  // Returns true if the value doesn't fit into allowed range for a type.
 | 
						|
  template <typename T, typename AT>
 | 
						|
  bool numericRangeCheckAndConvert(const AT& value)
 | 
						|
  {
 | 
						|
    if (value > AT(std::numeric_limits<T>::max()) || value < AT(std::numeric_limits<T>::min()))
 | 
						|
      return true;
 | 
						|
 | 
						|
    convertedValue = (uint64_t)static_cast<T>(value);
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
  // As of MCS 6.x there is an asumption MCS can't join having
 | 
						|
  // INTEGER and non-INTEGER potentially fractional keys,
 | 
						|
  // e.g. BIGINT to DECIMAL(38,1). It can only join BIGINT to DECIMAL(38).
 | 
						|
  // convert() checks if wide-DECIMAL overflows INTEGER type range
 | 
						|
  // and sets internal width to 0 if it is. If not width is set to 8
 | 
						|
  // and convertedValue is casted to INTEGER type.
 | 
						|
  // This convert() is called in EM to cast smallSide TypelessData
 | 
						|
  // if the key columns has a skew, e.g. INT to DECIMAL(38).
 | 
						|
  inline WideDecimalKeyConverter& convert(const bool otherSideIsIntOrNarrow,
 | 
						|
                                          const execplan::CalpontSystemCatalog::ColDataType otherSideType)
 | 
						|
  {
 | 
						|
    if (otherSideIsIntOrNarrow)
 | 
						|
    {
 | 
						|
      datatypes::TSInt128 integralPart = mR->getTSInt128Field(mKeyColId);
 | 
						|
 | 
						|
      bool isUnsigned = datatypes::isUnsigned(otherSideType);
 | 
						|
      if (isUnsigned)
 | 
						|
      {
 | 
						|
        width = (numericRangeCheckAndConvert<uint64_t>(integralPart)) ? 0 : datatypes::MAXLEGACYWIDTH;
 | 
						|
        return *this;
 | 
						|
      }
 | 
						|
      width = (numericRangeCheckAndConvert<int64_t>(integralPart)) ? 0 : datatypes::MAXLEGACYWIDTH;
 | 
						|
    }
 | 
						|
    return *this;
 | 
						|
  }
 | 
						|
  // Stores the value that might had been converted.
 | 
						|
  inline bool store(TypelessData& typelessData, uint32_t& off, const uint32_t keylen) const
 | 
						|
  {
 | 
						|
    // A note from convert() if there is otherSide column type range
 | 
						|
    // overflow so store() returns TD with len=0. This tells EM to skip this
 | 
						|
    // key b/c it won't match at PP. This happens it is possible to skip
 | 
						|
    // smallSide TD but can't to do the same with largeSide b/c of OUTER joins.
 | 
						|
    if (!width)
 | 
						|
    {
 | 
						|
      typelessData.len = 0;
 | 
						|
      return true;
 | 
						|
    }
 | 
						|
    if (off + width > keylen)
 | 
						|
      return true;
 | 
						|
    switch (width)
 | 
						|
    {
 | 
						|
      case datatypes::MAXDECIMALWIDTH:
 | 
						|
      {
 | 
						|
        mR->storeInt128FieldIntoPtr(mKeyColId, &typelessData.data[off]);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      default:
 | 
						|
      {
 | 
						|
        datatypes::TUInt64(convertedValue).store(&typelessData.data[off]);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    off += width;
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
// smallSideColWidths is non-nullptr valid pointer only
 | 
						|
// if there is a skew b/w small and large side columns widths.
 | 
						|
uint32 TypelessData::hash(const RowGroup& r, const std::vector<uint32_t>& keyCols,
 | 
						|
                          const std::vector<uint32_t>* smallSideKeyColumnsIds,
 | 
						|
                          const rowgroup::RowGroup* smallSideRG) const
 | 
						|
{
 | 
						|
  // This part is for largeSide hashing using Row at PP.
 | 
						|
  if (!isSmallSide())
 | 
						|
  {
 | 
						|
    return mRowPtr->hashTypeless(keyCols, smallSideKeyColumnsIds,
 | 
						|
                                 (smallSideRG) ? &smallSideRG->getColWidths() : nullptr);
 | 
						|
  }
 | 
						|
  // This part is for smallSide hashing at PP.
 | 
						|
  TypelessDataDecoder decoder(*this);
 | 
						|
  datatypes::MariaDBHasher hasher;
 | 
						|
  for (auto keyColId : keyCols)
 | 
						|
  {
 | 
						|
    switch (r.getColTypes()[keyColId])
 | 
						|
    {
 | 
						|
      case CalpontSystemCatalog::VARCHAR:
 | 
						|
      case CalpontSystemCatalog::CHAR:
 | 
						|
      case CalpontSystemCatalog::TEXT:
 | 
						|
      {
 | 
						|
        CHARSET_INFO* cs = const_cast<RowGroup&>(r).getCharset(keyColId);
 | 
						|
        hasher.add(cs, decoder.scanString());
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      case CalpontSystemCatalog::DECIMAL:
 | 
						|
      {
 | 
						|
        const uint32_t width = std::max(r.getColWidths()[keyColId], datatypes::MAXLEGACYWIDTH);
 | 
						|
        if (isSmallSideWithSkewedData() || width == datatypes::MAXLEGACYWIDTH)
 | 
						|
        {
 | 
						|
          int64_t val = decoder.scanTInt64();
 | 
						|
          hasher.add(&my_charset_bin, reinterpret_cast<const char*>(&val), datatypes::MAXLEGACYWIDTH);
 | 
						|
        }
 | 
						|
        else
 | 
						|
          hasher.add(&my_charset_bin, decoder.scanGeneric(width));
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      default:
 | 
						|
      {
 | 
						|
        hasher.add(&my_charset_bin, decoder.scanGeneric(datatypes::MAXLEGACYWIDTH));
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return hasher.finalize();
 | 
						|
}
 | 
						|
 | 
						|
// this is smallSide, Row represents largeSide record.
 | 
						|
int TypelessData::cmpToRow(const RowGroup& r, const std::vector<uint32_t>& keyCols, const rowgroup::Row& row,
 | 
						|
                           const std::vector<uint32_t>* smallSideKeyColumnsIds,
 | 
						|
                           const rowgroup::RowGroup* smallSideRG) const
 | 
						|
{
 | 
						|
  TypelessDataDecoder a(*this);
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < keyCols.size(); i++)
 | 
						|
  {
 | 
						|
    auto largeSideKeyColRowIdx = keyCols[i];
 | 
						|
    switch (r.getColType(largeSideKeyColRowIdx))
 | 
						|
    {
 | 
						|
      case CalpontSystemCatalog::VARCHAR:
 | 
						|
      case CalpontSystemCatalog::CHAR:
 | 
						|
      case CalpontSystemCatalog::TEXT:
 | 
						|
      {
 | 
						|
        datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(largeSideKeyColRowIdx));
 | 
						|
        ConstString ta = a.scanString();
 | 
						|
        ConstString tb = row.getConstString(largeSideKeyColRowIdx);
 | 
						|
        if (int rc = cs.strnncollsp(ta, tb))
 | 
						|
          return rc;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      case CalpontSystemCatalog::DECIMAL:
 | 
						|
      {
 | 
						|
        auto largeSideWidth = row.getColumnWidth(largeSideKeyColRowIdx);
 | 
						|
        // First branch processes skewed JOIN, e.g. INT to DECIMAL(38)
 | 
						|
        // else branch processes decimal with common width at both small- and largeSide.
 | 
						|
        if (isSmallSideWithSkewedData() &&
 | 
						|
            largeSideWidth != smallSideRG->getColumnWidth(smallSideKeyColumnsIds->operator[](i)))
 | 
						|
        {
 | 
						|
          if (largeSideWidth == datatypes::MAXLEGACYWIDTH)
 | 
						|
          {
 | 
						|
            if (int rc = a.scanTInt64() != row.getIntField(largeSideKeyColRowIdx))
 | 
						|
              return rc;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            WideDecimalKeyConverter cv(row, largeSideKeyColRowIdx);
 | 
						|
            if (!cv.convert(true, smallSideRG->getColType(smallSideKeyColumnsIds->operator[](i)))
 | 
						|
                     .isConvertedToSmallSideType())
 | 
						|
              return 1;
 | 
						|
            if (int rc = a.scanTInt64() != cv.getConvertedTInt64())
 | 
						|
              return rc;
 | 
						|
          }
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          // There is an assumption that both sides here are equal and are either 8 or 16 bytes.
 | 
						|
          if (largeSideWidth == datatypes::MAXDECIMALWIDTH)
 | 
						|
          {
 | 
						|
            if (int rc = a.scanTInt128() != row.getTSInt128Field(largeSideKeyColRowIdx))
 | 
						|
              return rc;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            if (int rc = a.scanTInt64() != row.getIntField(largeSideKeyColRowIdx))
 | 
						|
              return rc;
 | 
						|
          }
 | 
						|
        }
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      default:
 | 
						|
      {
 | 
						|
        ConstString ta = a.scanGeneric(datatypes::MAXLEGACYWIDTH);
 | 
						|
        if (r.isUnsigned(largeSideKeyColRowIdx))
 | 
						|
        {
 | 
						|
          uint64_t tb = row.getUintField(largeSideKeyColRowIdx);
 | 
						|
          if (int rc = memcmp(ta.str(), &tb, datatypes::MAXLEGACYWIDTH))
 | 
						|
            return rc;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          int64_t tb = row.getIntField(largeSideKeyColRowIdx);
 | 
						|
          if (int rc = memcmp(ta.str(), &tb, datatypes::MAXLEGACYWIDTH))
 | 
						|
            return rc;
 | 
						|
        }
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return 0;  // Equal
 | 
						|
}
 | 
						|
 | 
						|
int TypelessData::cmp(const RowGroup& r, const std::vector<uint32_t>& keyCols, const TypelessData& da,
 | 
						|
                      const TypelessData& db, const std::vector<uint32_t>* smallSideKeyColumnsIds,
 | 
						|
                      const rowgroup::RowGroup* smallSideRG)
 | 
						|
{
 | 
						|
  idbassert(da.isSmallSide() || db.isSmallSide());
 | 
						|
  if (!da.isSmallSide() && db.isSmallSide())
 | 
						|
    return -db.cmpToRow(r, keyCols, da.mRowPtr[0], smallSideKeyColumnsIds, smallSideRG);
 | 
						|
  if (da.isSmallSide() && !db.isSmallSide())
 | 
						|
    return da.cmpToRow(r, keyCols, db.mRowPtr[0], smallSideKeyColumnsIds, smallSideRG);
 | 
						|
 | 
						|
  // This case happens in BPP::addToJoiner when it populates the final
 | 
						|
  // hashmap with multiple smallSide TDs from temp hashmaps.
 | 
						|
  idbassert(da.isSmallSide() && db.isSmallSide());
 | 
						|
 | 
						|
  TypelessDataDecoder a(da);
 | 
						|
  TypelessDataDecoder b(db);
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < keyCols.size(); ++i)
 | 
						|
  {
 | 
						|
    auto keyColIdx = keyCols[i];
 | 
						|
    switch (r.getColTypes()[keyColIdx])
 | 
						|
    {
 | 
						|
      case CalpontSystemCatalog::VARCHAR:
 | 
						|
      case CalpontSystemCatalog::CHAR:
 | 
						|
      case CalpontSystemCatalog::TEXT:
 | 
						|
      {
 | 
						|
        datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(keyColIdx));
 | 
						|
        ConstString ta = a.scanString();
 | 
						|
        ConstString tb = b.scanString();
 | 
						|
        if (int rc = cs.strnncollsp(ta, tb))
 | 
						|
          return rc;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      case CalpontSystemCatalog::DECIMAL:
 | 
						|
      {
 | 
						|
        auto largeSideWidth = r.getColumnWidth(keyColIdx);
 | 
						|
        // First and second branches processes skewed JOIN, e.g. INT to DECIMAL(38)
 | 
						|
        // Third processes decimal with common width at both small- and largeSide.
 | 
						|
        auto width = (da.isSmallSideWithSkewedData() &&
 | 
						|
                      largeSideWidth != smallSideRG->getColumnWidth(smallSideKeyColumnsIds->operator[](i)))
 | 
						|
                         ? datatypes::MAXLEGACYWIDTH
 | 
						|
                         : std::max(r.getColWidths()[keyColIdx], datatypes::MAXLEGACYWIDTH);
 | 
						|
        ConstString ta = a.scanGeneric(width);
 | 
						|
        ConstString tb = b.scanGeneric(width);
 | 
						|
        if (int rc = memcmp(ta.str(), tb.str(), width))
 | 
						|
          return rc;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      default:
 | 
						|
      {
 | 
						|
        ConstString ta = a.scanGeneric(datatypes::MAXLEGACYWIDTH);
 | 
						|
        ConstString tb = b.scanGeneric(datatypes::MAXLEGACYWIDTH);
 | 
						|
        idbassert(ta.length() == tb.length());
 | 
						|
        // It is impossible to join signed to unsigned types now
 | 
						|
        // but there is a potential error, e.g. uint64 vs negative int64.
 | 
						|
        if (int rc = memcmp(ta.str(), tb.str(), ta.length()))
 | 
						|
          return rc;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return 0;  // Equal
 | 
						|
}
 | 
						|
 | 
						|
// Called in joblist code to produce SmallSide TypelessData to be sent to PP.
 | 
						|
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, uint32_t keylen,
 | 
						|
                             FixedAllocator* fa, const rowgroup::RowGroup& otherSideRG,
 | 
						|
                             const std::vector<uint32_t>& otherKeyCols)
 | 
						|
{
 | 
						|
  TypelessData ret;
 | 
						|
  uint32_t off = 0, i;
 | 
						|
  execplan::CalpontSystemCatalog::ColDataType type;
 | 
						|
 | 
						|
  ret.data = (uint8_t*)fa->allocate();
 | 
						|
  idbassert(keyCols.size() == otherKeyCols.size());
 | 
						|
 | 
						|
  for (i = 0; i < keyCols.size(); i++)
 | 
						|
  {
 | 
						|
    type = r.getColTypes()[keyCols[i]];
 | 
						|
 | 
						|
    if (datatypes::isCharType(type))
 | 
						|
    {
 | 
						|
      // this is a string, copy a normalized version
 | 
						|
      const utils::ConstString str = r.getConstString(keyCols[i]);
 | 
						|
      if (TypelessDataStringEncoder(str).store(ret.data, off, keylen))
 | 
						|
        goto toolong;
 | 
						|
    }
 | 
						|
    else if (datatypes::isWideDecimalType(type, r.getColumnWidth(keyCols[i])))
 | 
						|
    {
 | 
						|
      bool otherSideIsIntOrNarrow = otherSideRG.getColumnWidth(otherKeyCols[i]) <= datatypes::MAXLEGACYWIDTH;
 | 
						|
      // useless if otherSideIsInt is false
 | 
						|
      auto otherSideType = (otherSideIsIntOrNarrow) ? otherSideRG.getColType(otherKeyCols[i])
 | 
						|
                                                    : datatypes::SystemCatalog::UNDEFINED;
 | 
						|
      if (WideDecimalKeyConverter(r, keyCols[i])
 | 
						|
              .convert(otherSideIsIntOrNarrow, otherSideType)
 | 
						|
              .store(ret, off, keylen))
 | 
						|
      {
 | 
						|
        goto toolong;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (datatypes::isLongDouble(type))
 | 
						|
    {
 | 
						|
      if (off + sizeof(long double) > keylen)
 | 
						|
        goto toolong;
 | 
						|
      // Small side is a long double. Since CS can't store larger than DOUBLE,
 | 
						|
      // we need to convert to whatever type large side is -- double or int64
 | 
						|
      long double keyld = r.getLongDoubleField(keyCols[i]);
 | 
						|
      switch (otherSideRG.getColType(otherKeyCols[i]))
 | 
						|
      {
 | 
						|
        case CalpontSystemCatalog::DOUBLE:
 | 
						|
        case CalpontSystemCatalog::UDOUBLE:
 | 
						|
        case CalpontSystemCatalog::FLOAT:
 | 
						|
        case CalpontSystemCatalog::UFLOAT:
 | 
						|
        {
 | 
						|
          if (off + 8 > keylen)
 | 
						|
            goto toolong;
 | 
						|
          if (keyld > MAX_DOUBLE || keyld < MIN_DOUBLE)
 | 
						|
          {
 | 
						|
            ret.len = 0;
 | 
						|
            return ret;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            double d = (double)keyld;
 | 
						|
            *((int64_t*)&ret.data[off]) = *(int64_t*)&d;
 | 
						|
          }
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        case CalpontSystemCatalog::LONGDOUBLE:
 | 
						|
        {
 | 
						|
          if (off + sizeof(long double) > keylen)
 | 
						|
            goto toolong;
 | 
						|
          *((long double*)&ret.data[off]) = keyld;
 | 
						|
          off += sizeof(long double);
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        default:
 | 
						|
        {
 | 
						|
          if (off + 8 > keylen)
 | 
						|
            goto toolong;
 | 
						|
          if (r.isUnsigned(keyCols[i]) && keyld > MAX_UBIGINT)
 | 
						|
          {
 | 
						|
            ret.len = 0;
 | 
						|
            return ret;
 | 
						|
          }
 | 
						|
          else if (keyld > MAX_BIGINT || keyld < MIN_BIGINT)
 | 
						|
          {
 | 
						|
            ret.len = 0;
 | 
						|
            return ret;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            *((int64_t*)&ret.data[off]) = (int64_t)keyld;
 | 
						|
            off += 8;
 | 
						|
          }
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (r.isUnsigned(keyCols[i]))
 | 
						|
    {
 | 
						|
      if (off + 8 > keylen)
 | 
						|
        goto toolong;
 | 
						|
      *((uint64_t*)&ret.data[off]) = r.getUintField(keyCols[i]);
 | 
						|
      off += 8;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      if (off + 8 > keylen)
 | 
						|
        goto toolong;
 | 
						|
      *((int64_t*)&ret.data[off]) = r.getIntField(keyCols[i]);
 | 
						|
      off += 8;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  ret.len = off;
 | 
						|
  fa->truncateBy(keylen - off);
 | 
						|
  return ret;
 | 
						|
toolong:
 | 
						|
  fa->truncateBy(keylen);
 | 
						|
  ret.len = 0;
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
// The method is used by disk-based JOIN and it is not collation or wide DECIMAL aware.
 | 
						|
uint64_t getHashOfTypelessKey(const Row& r, const vector<uint32_t>& keyCols, uint32_t seed)
 | 
						|
{
 | 
						|
  Hasher_r hasher;
 | 
						|
  uint64_t ret = seed, tmp;
 | 
						|
  uint32_t i;
 | 
						|
  uint32_t width = 0;
 | 
						|
  char nullChar = '\0';
 | 
						|
  execplan::CalpontSystemCatalog::ColDataType type;
 | 
						|
 | 
						|
  for (i = 0; i < keyCols.size(); i++)
 | 
						|
  {
 | 
						|
    type = r.getColTypes()[keyCols[i]];
 | 
						|
 | 
						|
    if (type == CalpontSystemCatalog::VARCHAR || type == CalpontSystemCatalog::CHAR ||
 | 
						|
        type == CalpontSystemCatalog::TEXT)
 | 
						|
    {
 | 
						|
      // this is a string, copy a normalized version
 | 
						|
      const utils::ConstString str = r.getConstString(keyCols[i]);
 | 
						|
      ret = hasher(str.str(), str.length(), ret);
 | 
						|
      ret = hasher(&nullChar, 1, ret);
 | 
						|
      width += str.length() + 1;
 | 
						|
    }
 | 
						|
    else if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
 | 
						|
    {
 | 
						|
      long double tmp = r.getLongDoubleField(keyCols[i]);
 | 
						|
      ret = hasher((char*)&tmp, sizeof(long double), ret);
 | 
						|
      width += sizeof(long double);
 | 
						|
    }
 | 
						|
    else if (r.isUnsigned(keyCols[i]))
 | 
						|
    {
 | 
						|
      tmp = r.getUintField(keyCols[i]);
 | 
						|
      ret = hasher((char*)&tmp, 8, ret);
 | 
						|
      width += 8;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      tmp = r.getIntField(keyCols[i]);
 | 
						|
      ret = hasher((char*)&tmp, 8, ret);
 | 
						|
      width += 8;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  ret = hasher.finalize(ret, width);
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
string TypelessData::toString() const
 | 
						|
{
 | 
						|
  uint32_t i;
 | 
						|
  ostringstream os;
 | 
						|
 | 
						|
  os << hex;
 | 
						|
 | 
						|
  for (i = 0; i < len; i++)
 | 
						|
  {
 | 
						|
    os << (uint32_t)data[i] << " ";
 | 
						|
  }
 | 
						|
 | 
						|
  os << dec;
 | 
						|
  return os.str();
 | 
						|
}
 | 
						|
 | 
						|
void TypelessData::serialize(messageqcpp::ByteStream& b) const
 | 
						|
{
 | 
						|
  b << len;
 | 
						|
  b.append(data, len);
 | 
						|
  // Flags are not send b/c they are locally significant now.
 | 
						|
}
 | 
						|
 | 
						|
void TypelessData::deserialize(messageqcpp::ByteStream& b, utils::PoolAllocator& fa)
 | 
						|
{
 | 
						|
  b >> len;
 | 
						|
  data = (uint8_t*)fa.allocate(len);
 | 
						|
  memcpy(data, b.buf(), len);
 | 
						|
  b.advance(len);
 | 
						|
}
 | 
						|
 | 
						|
bool TupleJoiner::hasNullJoinColumn(const Row& r) const
 | 
						|
{
 | 
						|
  uint64_t key;
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < largeKeyColumns.size(); i++)
 | 
						|
  {
 | 
						|
    if (r.isNullValue(largeKeyColumns[i]))
 | 
						|
      return true;
 | 
						|
 | 
						|
    if (UNLIKELY(bSignedUnsignedJoin))
 | 
						|
    {
 | 
						|
      // BUG 5628 If this is a signed/unsigned join column and the sign bit is set on either
 | 
						|
      // side, then this row should not compare. Treat as NULL to prevent compare, even if
 | 
						|
      // the bit patterns match.
 | 
						|
      if (smallRG.isUnsigned(smallKeyColumns[i]) != largeRG.isUnsigned(largeKeyColumns[i]))
 | 
						|
      {
 | 
						|
        if (r.isUnsigned(largeKeyColumns[i]))
 | 
						|
          key = r.getUintField(largeKeyColumns[i]);  // Does not propogate sign bit
 | 
						|
        else
 | 
						|
          key = r.getIntField(largeKeyColumns[i]);  // Propogates sign bit
 | 
						|
 | 
						|
        if (key & 0x8000000000000000ULL)
 | 
						|
        {
 | 
						|
          return true;
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return false;
 | 
						|
}
 | 
						|
 | 
						|
string TupleJoiner::getTableName() const
 | 
						|
{
 | 
						|
  return tableName;
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setTableName(const string& tname)
 | 
						|
{
 | 
						|
  tableName = tname;
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::clearHashMaps()
 | 
						|
{
 | 
						|
  ht.clear();
 | 
						|
  ld.clear();
 | 
						|
  sth.clear();
 | 
						|
  h.clear();
 | 
						|
}
 | 
						|
 | 
						|
/* Disk based join support */
 | 
						|
void TupleJoiner::clearData()
 | 
						|
{
 | 
						|
  // This loop calls dtors and deallocates mem.
 | 
						|
  clearHashMaps();
 | 
						|
  initHashMaps(smallKeyColumns[0]);
 | 
						|
  initRowsVector();
 | 
						|
  finished = false;
 | 
						|
}
 | 
						|
 | 
						|
std::shared_ptr<TupleJoiner> TupleJoiner::copyForDiskJoin()
 | 
						|
{
 | 
						|
  std::shared_ptr<TupleJoiner> ret(new TupleJoiner());
 | 
						|
 | 
						|
  ret->smallRG = smallRG;
 | 
						|
  ret->largeRG = largeRG;
 | 
						|
  ret->smallNullMemory = smallNullMemory;
 | 
						|
  ret->smallNullRow = smallNullRow;
 | 
						|
  ret->joinType = joinType;
 | 
						|
  ret->tableName = tableName;
 | 
						|
  ret->typelessJoin = typelessJoin;
 | 
						|
  ret->smallKeyColumns = smallKeyColumns;
 | 
						|
  ret->largeKeyColumns = largeKeyColumns;
 | 
						|
  ret->keyLength = keyLength;
 | 
						|
  ret->bSignedUnsignedJoin = bSignedUnsignedJoin;
 | 
						|
  ret->fe = fe;
 | 
						|
 | 
						|
  ret->nullValueForJoinColumn = nullValueForJoinColumn;
 | 
						|
  ret->uniqueLimit = uniqueLimit;
 | 
						|
 | 
						|
  ret->discreteValues.reset(new bool[smallKeyColumns.size()]);
 | 
						|
  ret->cpValues.reset(new vector<int128_t>[smallKeyColumns.size()]);
 | 
						|
  ret->resourceManager_ = resourceManager_;
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < smallKeyColumns.size(); i++)
 | 
						|
  {
 | 
						|
    ret->discreteValues[i] = false;
 | 
						|
    if (isUnsigned(smallRG.getColTypes()[smallKeyColumns[i]]))
 | 
						|
    {
 | 
						|
      if (datatypes::isWideDecimalType(smallRG.getColType(smallKeyColumns[i]),
 | 
						|
                                       smallRG.getColumnWidth(smallKeyColumns[i])))
 | 
						|
      {
 | 
						|
        ret->cpValues[i].push_back((int128_t)-1);
 | 
						|
        ret->cpValues[i].push_back(0);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        ret->cpValues[i].push_back((int128_t)numeric_limits<uint64_t>::max());
 | 
						|
        ret->cpValues[i].push_back(0);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      if (datatypes::isWideDecimalType(smallRG.getColType(smallKeyColumns[i]),
 | 
						|
                                       smallRG.getColumnWidth(smallKeyColumns[i])))
 | 
						|
      {
 | 
						|
        ret->cpValues[i].push_back(utils::maxInt128);
 | 
						|
        ret->cpValues[i].push_back(utils::minInt128);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        ret->cpValues[i].push_back(numeric_limits<int64_t>::max());
 | 
						|
        ret->cpValues[i].push_back(numeric_limits<int64_t>::min());
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (typelessJoin)
 | 
						|
  {
 | 
						|
    for (int i = 0; i < numCores; i++)
 | 
						|
    {
 | 
						|
      auto alloc = resourceManager_->getAllocator<utils::FixedAllocatorBufType>();
 | 
						|
      storedKeyAlloc.emplace_back(FixedAllocator(alloc, keyLength));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  ret->numCores = numCores;
 | 
						|
  ret->bucketCount = bucketCount;
 | 
						|
  ret->bucketMask = bucketMask;
 | 
						|
  ret->jobstepThreadPool = jobstepThreadPool;
 | 
						|
 | 
						|
  ret->setThreadCount(1);
 | 
						|
  ret->clearData();
 | 
						|
  ret->setInUM();
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
// Used for Typeless JOIN to detect if there is a JOIN when largeSide is wide-DECIMAL and
 | 
						|
// smallSide is a smaller data type, e.g. INT or narrow-DECIMAL.
 | 
						|
bool TupleJoiner::joinHasSkewedKeyColumn()
 | 
						|
{
 | 
						|
  std::vector<uint32_t>::const_iterator largeSideKeyColumnsIter = getLargeKeyColumns().begin();
 | 
						|
  std::vector<uint32_t>::const_iterator smallSideKeyColumnsIter = getSmallKeyColumns().begin();
 | 
						|
  idbassert(getLargeKeyColumns().size() == getSmallKeyColumns().size());
 | 
						|
  while (largeSideKeyColumnsIter != getLargeKeyColumns().end())
 | 
						|
  {
 | 
						|
    auto smallSideColumnWidth = smallRG.getColumnWidth(*smallSideKeyColumnsIter);
 | 
						|
    auto largeSideColumnWidth = largeRG.getColumnWidth(*largeSideKeyColumnsIter);
 | 
						|
    bool widthIsDifferent = smallSideColumnWidth != largeSideColumnWidth;
 | 
						|
    if (widthIsDifferent &&
 | 
						|
        (datatypes::isWideDecimalType(smallRG.getColTypes()[*smallSideKeyColumnsIter],
 | 
						|
                                      smallSideColumnWidth) ||
 | 
						|
         datatypes::isWideDecimalType(largeRG.getColTypes()[*largeSideKeyColumnsIter], largeSideColumnWidth)))
 | 
						|
    {
 | 
						|
      return true;
 | 
						|
    }
 | 
						|
    ++largeSideKeyColumnsIter;
 | 
						|
    ++smallSideKeyColumnsIter;
 | 
						|
  }
 | 
						|
  return false;
 | 
						|
}
 | 
						|
 | 
						|
void TupleJoiner::setConvertToDiskJoin()
 | 
						|
{
 | 
						|
  _convertToDiskJoin = true;
 | 
						|
}
 | 
						|
 | 
						|
// The method is made to reuse the code from Typeless TupleJoiner ctor.
 | 
						|
// It is used in the mentioned ctor and in initBPP() to calculate
 | 
						|
// Typeless key length in case of a JOIN when large side column is INT
 | 
						|
// and small side column is wide-DECIMAL.
 | 
						|
// An important assumption is that if the type is DECIMAL than it must
 | 
						|
// be wide-DECIMAL b/c MCS calls the function running Typeless TupleJoiner
 | 
						|
// ctor.
 | 
						|
uint32_t calculateKeyLength(const std::vector<uint32_t>& aKeyColumnsIds,
 | 
						|
                            const rowgroup::RowGroup& aSmallRowGroup,
 | 
						|
                            const std::vector<uint32_t>* aLargeKeyColumnsIds,
 | 
						|
                            const rowgroup::RowGroup* aLargeRowGroup)
 | 
						|
{
 | 
						|
  uint32_t keyLength = 0;
 | 
						|
  for (size_t keyColumnIdx = 0; keyColumnIdx < aKeyColumnsIds.size(); ++keyColumnIdx)
 | 
						|
  {
 | 
						|
    auto smallSideKeyColumnId = aKeyColumnsIds[keyColumnIdx];
 | 
						|
    auto largeSideKeyColumnId = (aLargeRowGroup) ? aLargeKeyColumnsIds->operator[](keyColumnIdx)
 | 
						|
                                                 : std::numeric_limits<uint64_t>::max();
 | 
						|
    const auto& smallKeyColumnType = aSmallRowGroup.getColTypes()[smallSideKeyColumnId];
 | 
						|
    // Not used if aLargeRowGroup is 0 that happens in PrimProc.
 | 
						|
    const auto& largeKeyColumntype = (aLargeRowGroup) ? aLargeRowGroup->getColTypes()[largeSideKeyColumnId]
 | 
						|
                                                      : datatypes::SystemCatalog::UNDEFINED;
 | 
						|
    if (datatypes::isCharType(smallKeyColumnType))
 | 
						|
    {
 | 
						|
      keyLength += aSmallRowGroup.getColumnWidth(smallSideKeyColumnId) + 2;  // +2 for encoded length
 | 
						|
 | 
						|
      // MCOL-698: if we don't do this LONGTEXT allocates 32TB RAM
 | 
						|
      if (keyLength > 65536)
 | 
						|
        return 65536;
 | 
						|
    }
 | 
						|
    else if (datatypes::isLongDouble(smallKeyColumnType))
 | 
						|
    {
 | 
						|
      keyLength += sizeof(long double);
 | 
						|
    }
 | 
						|
    else if (datatypes::isWideDecimalType(smallKeyColumnType,
 | 
						|
                                          aSmallRowGroup.getColumnWidth(smallSideKeyColumnId)))
 | 
						|
    {
 | 
						|
      keyLength += (aLargeRowGroup &&
 | 
						|
                    !datatypes::isWideDecimalType(largeKeyColumntype,
 | 
						|
                                                  aLargeRowGroup->getColumnWidth(smallSideKeyColumnId)))
 | 
						|
                       ? datatypes::MAXLEGACYWIDTH    // Small=Wide, Large=Narrow/xINT
 | 
						|
                       : datatypes::MAXDECIMALWIDTH;  // Small=Wide, Large=Wide
 | 
						|
    }
 | 
						|
    else
 | 
						|
    // The branch covers all datatypes left including skewed DECIMAL JOIN case
 | 
						|
    // Small=Wide, Large=Narrow
 | 
						|
    {
 | 
						|
      keyLength += datatypes::MAXLEGACYWIDTH;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return keyLength;
 | 
						|
}
 | 
						|
 | 
						|
};  // namespace joiner
 |