mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
5.5 merge
This commit is contained in:
193
sql/uniques.cc
193
sql/uniques.cc
@@ -77,7 +77,10 @@ int unique_intersect_write_to_ptrs(uchar* key, element_count count, Unique *uniq
|
||||
Unique::Unique(qsort_cmp2 comp_func, void * comp_func_fixed_arg,
|
||||
uint size_arg, ulonglong max_in_memory_size_arg,
|
||||
uint min_dupl_count_arg)
|
||||
:max_in_memory_size(max_in_memory_size_arg), size(size_arg), elements(0)
|
||||
:max_in_memory_size(max_in_memory_size_arg),
|
||||
record_pointers(NULL),
|
||||
size(size_arg),
|
||||
elements(0)
|
||||
{
|
||||
min_dupl_count= min_dupl_count_arg;
|
||||
full_size= size;
|
||||
@@ -582,6 +585,7 @@ end:
|
||||
SYNOPSIS
|
||||
Unique:walk()
|
||||
All params are 'IN':
|
||||
table parameter for the call of the merge method
|
||||
action function-visitor, typed in include/my_tree.h
|
||||
function is called for each unique element
|
||||
arg argument for visitor, which is passed to it on each call
|
||||
@@ -590,31 +594,122 @@ end:
|
||||
<> 0 error
|
||||
*/
|
||||
|
||||
bool Unique::walk(tree_walk_action action, void *walk_action_arg)
|
||||
bool Unique::walk(TABLE *table, tree_walk_action action, void *walk_action_arg)
|
||||
{
|
||||
int res;
|
||||
int res= 0;
|
||||
uchar *merge_buffer;
|
||||
|
||||
if (elements == 0) /* the whole tree is in memory */
|
||||
return tree_walk(&tree, action, walk_action_arg, left_root_right);
|
||||
|
||||
table->sort.found_records=elements+tree.elements_in_tree;
|
||||
/* flush current tree to the file to have some memory for merge buffer */
|
||||
if (flush())
|
||||
return 1;
|
||||
if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
|
||||
return 1;
|
||||
if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size,
|
||||
MYF(MY_THREAD_SPECIFIC))))
|
||||
ulong buff_sz= (max_in_memory_size / full_size + 1) * full_size;
|
||||
if (!(merge_buffer= (uchar *) my_malloc(buff_sz, MYF(MY_THREAD_SPECIFIC))))
|
||||
return 1;
|
||||
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, size,
|
||||
(BUFFPEK *) file_ptrs.buffer,
|
||||
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
|
||||
action, walk_action_arg,
|
||||
tree.compare, tree.custom_arg, &file);
|
||||
if (buff_sz < full_size * (file_ptrs.elements + 1UL))
|
||||
res= merge(table, merge_buffer, buff_sz >= full_size * MERGEBUFF2) ;
|
||||
|
||||
if (!res)
|
||||
{
|
||||
res= merge_walk(merge_buffer, (ulong) max_in_memory_size, full_size,
|
||||
(BUFFPEK *) file_ptrs.buffer,
|
||||
(BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
|
||||
action, walk_action_arg,
|
||||
tree.compare, tree.custom_arg, &file);
|
||||
}
|
||||
my_free(merge_buffer);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
DESCRIPTION
|
||||
Perform multi-pass sort merge of the elements accessed through table->sort,
|
||||
using the buffer buff as the merge buffer. The last pass is not performed
|
||||
if without_last_merge is TRUE.
|
||||
SYNOPSIS
|
||||
Unique:merge()
|
||||
All params are 'IN':
|
||||
table the parameter to access sort context
|
||||
buff merge buffer
|
||||
without_last_merge TRUE <=> do not perform the last merge
|
||||
RETURN VALUE
|
||||
0 OK
|
||||
<> 0 error
|
||||
*/
|
||||
|
||||
bool Unique::merge(TABLE *table, uchar *buff, bool without_last_merge)
|
||||
{
|
||||
IO_CACHE *outfile= table->sort.io_cache;
|
||||
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
|
||||
uint maxbuffer= file_ptrs.elements - 1;
|
||||
my_off_t save_pos;
|
||||
bool error= 1;
|
||||
|
||||
/* Open cached file if it isn't open */
|
||||
if (!outfile)
|
||||
outfile= table->sort.io_cache= (IO_CACHE*) my_malloc(sizeof(IO_CACHE),
|
||||
MYF(MY_THREAD_SPECIFIC|MY_ZEROFILL));
|
||||
if (!outfile ||
|
||||
(! my_b_inited(outfile) &&
|
||||
open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
|
||||
MYF(MY_WME))))
|
||||
return 1;
|
||||
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
|
||||
|
||||
Sort_param sort_param;
|
||||
bzero((char*) &sort_param,sizeof(sort_param));
|
||||
sort_param.max_rows= elements;
|
||||
sort_param.sort_form= table;
|
||||
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
|
||||
full_size;
|
||||
sort_param.min_dupl_count= min_dupl_count;
|
||||
sort_param.res_length= 0;
|
||||
sort_param.max_keys_per_buffer=
|
||||
(uint) (max_in_memory_size / sort_param.sort_length);
|
||||
sort_param.not_killable= 1;
|
||||
|
||||
sort_param.unique_buff= buff +(sort_param.max_keys_per_buffer *
|
||||
sort_param.sort_length);
|
||||
|
||||
sort_param.compare= (qsort2_cmp) buffpek_compare;
|
||||
sort_param.cmp_context.key_compare= tree.compare;
|
||||
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
|
||||
|
||||
/* Merge the buffers to one file, removing duplicates */
|
||||
if (merge_many_buff(&sort_param,buff,file_ptr,&maxbuffer,&file))
|
||||
goto err;
|
||||
if (flush_io_cache(&file) ||
|
||||
reinit_io_cache(&file,READ_CACHE,0L,0,0))
|
||||
goto err;
|
||||
sort_param.res_length= sort_param.rec_length-
|
||||
(min_dupl_count ? sizeof(min_dupl_count) : 0);
|
||||
if (without_last_merge)
|
||||
{
|
||||
file_ptrs.elements= maxbuffer+1;
|
||||
return 0;
|
||||
}
|
||||
if (merge_index(&sort_param, buff, file_ptr, maxbuffer, &file, outfile))
|
||||
goto err;
|
||||
error= 0;
|
||||
err:
|
||||
if (flush_io_cache(outfile))
|
||||
error= 1;
|
||||
|
||||
/* Setup io_cache for reading */
|
||||
save_pos= outfile->pos_in_file;
|
||||
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
|
||||
error= 1;
|
||||
outfile->end_of_file=save_pos;
|
||||
return error;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Modify the TABLE element so that when one calls init_records()
|
||||
the rows will be read in priority order.
|
||||
@@ -622,7 +717,10 @@ bool Unique::walk(tree_walk_action action, void *walk_action_arg)
|
||||
|
||||
bool Unique::get(TABLE *table)
|
||||
{
|
||||
table->sort.found_records=elements+tree.elements_in_tree;
|
||||
bool rc= 1;
|
||||
uchar *sort_buffer= NULL;
|
||||
table->sort.found_records= elements+tree.elements_in_tree;
|
||||
|
||||
if (my_b_tell(&file) == 0)
|
||||
{
|
||||
/* Whole tree is in memory; Don't use disk if you don't need to */
|
||||
@@ -642,71 +740,16 @@ bool Unique::get(TABLE *table)
|
||||
/* Not enough memory; Save the result to file && free memory used by tree */
|
||||
if (flush())
|
||||
return 1;
|
||||
|
||||
IO_CACHE *outfile=table->sort.io_cache;
|
||||
BUFFPEK *file_ptr= (BUFFPEK*) file_ptrs.buffer;
|
||||
uint maxbuffer= file_ptrs.elements - 1;
|
||||
uchar *sort_buffer;
|
||||
my_off_t save_pos;
|
||||
bool error=1;
|
||||
|
||||
/* Open cached file if it isn't open */
|
||||
outfile=table->sort.io_cache=(IO_CACHE*) my_malloc(sizeof(IO_CACHE),
|
||||
MYF(MY_ZEROFILL |
|
||||
MY_THREAD_SPECIFIC));
|
||||
|
||||
if (!outfile ||
|
||||
(! my_b_inited(outfile) &&
|
||||
open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
|
||||
MYF(MY_WME))))
|
||||
|
||||
ulong buff_sz= (max_in_memory_size / full_size + 1) * full_size;
|
||||
if (!(sort_buffer= (uchar*) my_malloc(buff_sz, MYF(MY_THREAD_SPECIFIC))))
|
||||
return 1;
|
||||
reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
|
||||
|
||||
Sort_param sort_param;
|
||||
bzero((char*) &sort_param,sizeof(sort_param));
|
||||
sort_param.max_rows= elements;
|
||||
sort_param.sort_form=table;
|
||||
sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
|
||||
full_size;
|
||||
sort_param.min_dupl_count= min_dupl_count;
|
||||
sort_param.res_length= 0;
|
||||
sort_param.max_keys_per_buffer=
|
||||
(uint) (max_in_memory_size / sort_param.sort_length);
|
||||
sort_param.not_killable=1;
|
||||
if (merge(table, sort_buffer, FALSE))
|
||||
goto err;
|
||||
rc= 0;
|
||||
|
||||
if (!(sort_buffer=(uchar*) my_malloc((sort_param.max_keys_per_buffer+1) *
|
||||
sort_param.sort_length,
|
||||
MYF(MY_THREAD_SPECIFIC))))
|
||||
return 1;
|
||||
sort_param.unique_buff= sort_buffer+(sort_param.max_keys_per_buffer *
|
||||
sort_param.sort_length);
|
||||
|
||||
sort_param.compare= (qsort2_cmp) buffpek_compare;
|
||||
sort_param.cmp_context.key_compare= tree.compare;
|
||||
sort_param.cmp_context.key_compare_arg= tree.custom_arg;
|
||||
|
||||
/* Merge the buffers to one file, removing duplicates */
|
||||
if (merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
|
||||
goto err;
|
||||
if (flush_io_cache(&file) ||
|
||||
reinit_io_cache(&file,READ_CACHE,0L,0,0))
|
||||
goto err;
|
||||
sort_param.res_length= sort_param.rec_length-
|
||||
(min_dupl_count ? sizeof(min_dupl_count) : 0);
|
||||
if (merge_index(&sort_param, sort_buffer, file_ptr, maxbuffer, &file, outfile))
|
||||
goto err;
|
||||
error=0;
|
||||
err:
|
||||
my_free(sort_buffer);
|
||||
if (flush_io_cache(outfile))
|
||||
error=1;
|
||||
|
||||
/* Setup io_cache for reading */
|
||||
save_pos=outfile->pos_in_file;
|
||||
if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
|
||||
error=1;
|
||||
outfile->end_of_file=save_pos;
|
||||
return error;
|
||||
err:
|
||||
my_free(sort_buffer);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user