1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Denis Khalikov 1f190a6e75 MCOL-5477 Disk join step improvement.
This patch:
1. Handles corner case when the bucket exceeded the memory limit, but we cannot redistribute the data in this bucket into new buckets based on a hash algorithm, because the rows have the same values.
2. Adds force option for disk join step.
3. Add a option to contol the depth of the partition tree.
2023-06-23 18:40:15 +03:00

183 lines
6.0 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "rowgroup.h"
#include "hasher.h"
#include "idbcompress.h"
#include <vector>
#include <fstream>
#include <boost/thread.hpp>
namespace joiner
{
class JoinPartition
{
public:
JoinPartition();
JoinPartition(const rowgroup::RowGroup& largeRG, const rowgroup::RowGroup& smallRG,
const std::vector<uint32_t>& smallkeyCols, const std::vector<uint32_t>& largeKeyCols,
bool typeless, bool isAntiWithMatchNulls, bool hasFEFilter, uint64_t totalUMMemory,
uint64_t partitionSize, uint32_t maxPartitionTreeDepth);
JoinPartition(const JoinPartition&, bool splitMode, uint32_t depth);
virtual ~JoinPartition();
// For now, the root node will use the RGData interface, the branches & leaves use
// only the Row interface.
int64_t insertSmallSideRow(const rowgroup::Row& row);
int64_t insertSmallSideRGData(rowgroup::RGData&);
// note, the vector version of this fcn frees the input RGDatas as it goes
int64_t insertSmallSideRGData(std::vector<rowgroup::RGData>&);
int64_t doneInsertingSmallData();
int64_t insertLargeSideRGData(rowgroup::RGData&);
int64_t insertLargeSideRow(const rowgroup::Row& row);
int64_t doneInsertingLargeData();
/* Returns true if there are more partitions to fetch, false otherwise */
bool getNextPartition(std::vector<rowgroup::RGData>* smallData, uint64_t* partitionID, JoinPartition** jp);
void collectJoinPartitions(std::vector<JoinPartition*>& joinPartitions);
boost::shared_ptr<rowgroup::RGData> getNextLargeRGData();
/* It's important to follow the sequence of operations to maintain the correct
internal state. Right now it doesn't check that you the programmer are doing things
right, it'll likely fail queries or crash if you do things wrong.
This should be made simpler at some point.
On construction, the JP is config'd for small-side reading.
After that's done, call doneInsertingSmallData() and initForLargeSideFeed().
Then, insert the large-side data. When done, call doneInsertingLargeData()
and initForProcessing().
In the processing phase, use getNextPartition() and getNextLargeRGData()
to get the data back out. After processing all partitions, if it's necessary
to process more iterations of the large side, call initForProcessing() again, and
continue as before.
*/
/* Call this before reading into the large side */
void initForLargeSideFeed();
/* Call this between large-side insertion & join processing */
void initForProcessing();
/* Small outer joins need to retain some state after each large-side iteration */
void saveSmallSidePartition(std::vector<rowgroup::RGData>& rgdata);
/* each JP instance stores the sizes of every JP instance below it, so root node has the total. */
int64_t getCurrentDiskUsage()
{
return smallSizeOnDisk + largeSizeOnDisk;
}
int64_t getSmallSideDiskUsage()
{
return smallSizeOnDisk;
}
int64_t getLargeSideDiskUsage()
{
return largeSizeOnDisk;
}
uint64_t getBytesRead();
uint64_t getBytesWritten();
uint64_t getMaxLargeSize()
{
return maxLargeSize;
}
uint64_t getMaxSmallSize()
{
return maxSmallSize;
}
void readByteStream(int which, messageqcpp::ByteStream* bs);
uint64_t getUniqueID()
{
return uniqueID;
}
void setNextSmallOffset(size_t offset)
{
nextSmallOffset = offset;
}
protected:
private:
void initBuffers();
int64_t convertToSplitMode();
bool canConvertToSplitMode();
int64_t processSmallBuffer();
int64_t processLargeBuffer();
int64_t processSmallBuffer(rowgroup::RGData&);
int64_t processLargeBuffer(rowgroup::RGData&);
rowgroup::RowGroup smallRG;
rowgroup::RowGroup largeRG;
std::vector<uint32_t> smallKeyCols;
std::vector<uint32_t> largeKeyCols;
bool typelessJoin;
uint32_t hashSeed;
std::vector<boost::shared_ptr<JoinPartition> > buckets;
uint32_t bucketCount; // = TotalUMMem / htTargetSize
bool fileMode;
std::fstream smallFile;
std::fstream largeFile;
std::string filenamePrefix;
std::string smallFilename;
std::string largeFilename;
rowgroup::RGData buffer;
rowgroup::Row smallRow;
rowgroup::Row largeRow;
uint32_t nextPartitionToReturn;
uint64_t htSizeEstimate;
uint64_t htTargetSize;
uint64_t uniqueID;
uint64_t smallSizeOnDisk;
uint64_t largeSizeOnDisk;
utils::Hasher_r hasher;
bool rootNode;
bool canSplit;
/* Not-in antijoin hack. A small-side row with a null join column has to go into every partition or
into one always resident partition (TBD).
If an F&E filter exists, it needs all null rows, if not, it only needs one. */
bool antiWithMatchNulls;
bool needsAllNullRows;
bool gotNullRow;
bool hasNullJoinColumn(rowgroup::Row&);
// which = 0 -> smallFile, which = 1 -> largeFile
uint64_t writeByteStream(int which, messageqcpp::ByteStream& bs);
/* Compression support */
bool useCompression;
std::shared_ptr<compress::CompressInterface> compressor;
/* TBD: do the reading/writing in one thread, compression/decompression in another */
/* Some stats for reporting */
uint64_t totalBytesRead, totalBytesWritten;
uint64_t maxLargeSize, maxSmallSize;
/* file descriptor reduction */
size_t nextSmallOffset;
size_t nextLargeOffset;
// Options to control partition tree depth.
uint32_t currentPartitionTreeDepth;
uint32_t maxPartitionTreeDepth;
};
} // namespace joiner