You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	feat(rbo) parallel factor via session veriable
This commit is contained in:
		@@ -7601,7 +7601,7 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP&
 | 
				
			|||||||
  derivedTableOptimization(&gwi, csep);
 | 
					  derivedTableOptimization(&gwi, csep);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    optimizer::RBOptimizerContext ctx(gwi, *thd, csep->traceOn());
 | 
					    optimizer::RBOptimizerContext ctx(gwi, *thd, csep->traceOn(), get_ces_optimization_parallel_factor(thd));
 | 
				
			||||||
    // TODO RBO can crash or fail leaving CSEP in an invalid state, so there must be a valid CSEP copy
 | 
					    // TODO RBO can crash or fail leaving CSEP in an invalid state, so there must be a valid CSEP copy
 | 
				
			||||||
    // TBD There is a tradeoff b/w copy per rule and copy per optimizer run.
 | 
					    // TBD There is a tradeoff b/w copy per rule and copy per optimizer run.
 | 
				
			||||||
    bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx, get_unstable_optimizer(&ctx.thd));
 | 
					    bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx, get_unstable_optimizer(&ctx.thd));
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -85,6 +85,12 @@ static MYSQL_THDVAR_UINT(orderby_threads, PLUGIN_VAR_RQCMDARG,
 | 
				
			|||||||
                         "Number of parallel threads used by ORDER BY. (default to 16)", NULL, NULL, 16, 0,
 | 
					                         "Number of parallel threads used by ORDER BY. (default to 16)", NULL, NULL, 16, 0,
 | 
				
			||||||
                         2048, 1);
 | 
					                         2048, 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static constexpr uint DEFAULT_CES_OPTIMIZATION_PARALLEL_FACTOR = 50;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static MYSQL_THDVAR_UINT(ces_optimization_parallel_factor, PLUGIN_VAR_RQCMDARG,
 | 
				
			||||||
 | 
					                         "Maximum parallel factor for parallel CES optimization. (default to 50)", NULL, NULL, DEFAULT_CES_OPTIMIZATION_PARALLEL_FACTOR, 1,
 | 
				
			||||||
 | 
					                         1000, 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// legacy system variables
 | 
					// legacy system variables
 | 
				
			||||||
static MYSQL_THDVAR_ULONG(decimal_scale, PLUGIN_VAR_RQCMDARG,
 | 
					static MYSQL_THDVAR_ULONG(decimal_scale, PLUGIN_VAR_RQCMDARG,
 | 
				
			||||||
                          "The default decimal precision for calculated column sub-operations ", NULL, NULL,
 | 
					                          "The default decimal precision for calculated column sub-operations ", NULL, NULL,
 | 
				
			||||||
@@ -236,6 +242,7 @@ st_mysql_sys_var* mcs_system_variables[] = {
 | 
				
			|||||||
    MYSQL_SYSVAR(derived_handler),
 | 
					    MYSQL_SYSVAR(derived_handler),
 | 
				
			||||||
    MYSQL_SYSVAR(select_handler_in_stored_procedures),
 | 
					    MYSQL_SYSVAR(select_handler_in_stored_procedures),
 | 
				
			||||||
    MYSQL_SYSVAR(orderby_threads),
 | 
					    MYSQL_SYSVAR(orderby_threads),
 | 
				
			||||||
 | 
					    MYSQL_SYSVAR(ces_optimization_parallel_factor),
 | 
				
			||||||
    MYSQL_SYSVAR(decimal_scale),
 | 
					    MYSQL_SYSVAR(decimal_scale),
 | 
				
			||||||
    MYSQL_SYSVAR(use_decimal_scale),
 | 
					    MYSQL_SYSVAR(use_decimal_scale),
 | 
				
			||||||
    MYSQL_SYSVAR(ordered_only),
 | 
					    MYSQL_SYSVAR(ordered_only),
 | 
				
			||||||
@@ -368,6 +375,15 @@ void set_orderby_threads(THD* thd, uint value)
 | 
				
			|||||||
  THDVAR(thd, orderby_threads) = value;
 | 
					  THDVAR(thd, orderby_threads) = value;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					uint get_ces_optimization_parallel_factor(THD* thd)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  return (thd == NULL) ? DEFAULT_CES_OPTIMIZATION_PARALLEL_FACTOR : THDVAR(thd, ces_optimization_parallel_factor);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					void set_ces_optimization_parallel_factor(THD* thd, uint value)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  THDVAR(thd, ces_optimization_parallel_factor) = value;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
bool get_use_decimal_scale(THD* thd)
 | 
					bool get_use_decimal_scale(THD* thd)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  return (thd == NULL) ? false : THDVAR(thd, use_decimal_scale);
 | 
					  return (thd == NULL) ? false : THDVAR(thd, use_decimal_scale);
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -81,6 +81,9 @@ void set_select_handler_in_stored_procedures(THD* thd, bool value);
 | 
				
			|||||||
uint get_orderby_threads(THD* thd);
 | 
					uint get_orderby_threads(THD* thd);
 | 
				
			||||||
void set_orderby_threads(THD* thd, uint value);
 | 
					void set_orderby_threads(THD* thd, uint value);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					uint get_ces_optimization_parallel_factor(THD* thd);
 | 
				
			||||||
 | 
					void set_ces_optimization_parallel_factor(THD* thd, uint value);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
bool get_use_decimal_scale(THD* thd);
 | 
					bool get_use_decimal_scale(THD* thd);
 | 
				
			||||||
void set_use_decimal_scale(THD* thd, bool value);
 | 
					void set_use_decimal_scale(THD* thd, bool value);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -35,6 +35,7 @@
 | 
				
			|||||||
#include "returnedcolumn.h"
 | 
					#include "returnedcolumn.h"
 | 
				
			||||||
#include "simplefilter.h"
 | 
					#include "simplefilter.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace optimizer
 | 
					namespace optimizer
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -45,7 +46,6 @@ using SCAndItsProjectionPosition = std::pair<execplan::SimpleColumn*, uint32_t>;
 | 
				
			|||||||
using SCsAndTheirProjectionPositions = std::vector<SCAndItsProjectionPosition>;
 | 
					using SCsAndTheirProjectionPositions = std::vector<SCAndItsProjectionPosition>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
 | 
					static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
 | 
				
			||||||
static const size_t MaxParallelFactor = 50;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace details
 | 
					namespace details
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
@@ -233,7 +233,7 @@ bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep, optimizer::RB
 | 
				
			|||||||
// Populates range bounds based on column statistics
 | 
					// Populates range bounds based on column statistics
 | 
				
			||||||
// Returns optional with bounds if successful, nullopt otherwise
 | 
					// Returns optional with bounds if successful, nullopt otherwise
 | 
				
			||||||
template <typename T>
 | 
					template <typename T>
 | 
				
			||||||
std::optional<details::FilterRangeBounds<T>> populateRangeBounds(Histogram_json_hb* columnStatistics)
 | 
					std::optional<details::FilterRangeBounds<T>> populateRangeBounds(Histogram_json_hb* columnStatistics, optimizer::RBOptimizerContext& ctx)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  details::FilterRangeBounds<T> bounds;
 | 
					  details::FilterRangeBounds<T> bounds;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -250,11 +250,12 @@ std::optional<details::FilterRangeBounds<T>> populateRangeBounds(Histogram_json_
 | 
				
			|||||||
    return v;
 | 
					    return v;
 | 
				
			||||||
  };
 | 
					  };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // TODO configurable parallel factor via session variable
 | 
					  // Get parallel factor from context
 | 
				
			||||||
  // NB now histogram size is the way to control parallel factor with 16 being the maximum
 | 
					  size_t maxParallelFactor = ctx.cesOptimizationParallelFactor;
 | 
				
			||||||
  std::cout << "populateRangeBounds() columnStatistics->buckets.size() "
 | 
					  std::cout << "populateRangeBounds() columnStatistics->buckets.size() "
 | 
				
			||||||
            << columnStatistics->get_json_histogram().size() << std::endl;
 | 
					            << columnStatistics->get_json_histogram().size() << std::endl;
 | 
				
			||||||
  size_t numberOfUnionUnits = std::min(columnStatistics->get_json_histogram().size(), MaxParallelFactor);
 | 
					  std::cout << "Session ces_optimization_parallel_factor: " << maxParallelFactor << std::endl;
 | 
				
			||||||
 | 
					  size_t numberOfUnionUnits = std::min(columnStatistics->get_json_histogram().size(), maxParallelFactor);
 | 
				
			||||||
  size_t numberOfBucketsPerUnionUnit = columnStatistics->get_json_histogram().size() / numberOfUnionUnits;
 | 
					  size_t numberOfBucketsPerUnionUnit = columnStatistics->get_json_histogram().size() / numberOfUnionUnits;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  std::cout << "Number of union units: " << numberOfUnionUnits << std::endl;
 | 
					  std::cout << "Number of union units: " << numberOfUnionUnits << std::endl;
 | 
				
			||||||
@@ -334,7 +335,7 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
 | 
				
			|||||||
  std::cout << "makeUnionFromTable RC front " << csep.returnedCols().front()->toString() << std::endl;
 | 
					  std::cout << "makeUnionFromTable RC front " << csep.returnedCols().front()->toString() << std::endl;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // TODO char and other numerical types support
 | 
					  // TODO char and other numerical types support
 | 
				
			||||||
  auto boundsOpt = populateRangeBounds<uint64_t>(columnStatistics);
 | 
					  auto boundsOpt = populateRangeBounds<uint64_t>(columnStatistics, ctx);
 | 
				
			||||||
  if (!boundsOpt.has_value())
 | 
					  if (!boundsOpt.has_value())
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    return unionVec;
 | 
					    return unionVec;
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,8 +34,8 @@ class RBOptimizerContext
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
 public:
 | 
					 public:
 | 
				
			||||||
  RBOptimizerContext() = delete;
 | 
					  RBOptimizerContext() = delete;
 | 
				
			||||||
  RBOptimizerContext(cal_impl_if::gp_walk_info& walk_info, THD& thd, bool logRules)
 | 
					  RBOptimizerContext(cal_impl_if::gp_walk_info& walk_info, THD& thd, bool logRules, uint cesOptimizationParallelFactor = 50)
 | 
				
			||||||
   : gwi(walk_info), thd(thd), logRules(logRules)
 | 
					   : gwi(walk_info), thd(thd), logRules(logRules), cesOptimizationParallelFactor(cesOptimizationParallelFactor)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  // gwi lifetime should be longer than optimizer context.
 | 
					  // gwi lifetime should be longer than optimizer context.
 | 
				
			||||||
@@ -44,6 +44,7 @@ class RBOptimizerContext
 | 
				
			|||||||
  THD& thd;
 | 
					  THD& thd;
 | 
				
			||||||
  uint64_t uniqueId{0};
 | 
					  uint64_t uniqueId{0};
 | 
				
			||||||
  bool logRules{false};
 | 
					  bool logRules{false};
 | 
				
			||||||
 | 
					  uint cesOptimizationParallelFactor;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct Rule
 | 
					struct Rule
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user