1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Speed up ruleutils' name de-duplication code, and fix overlength-name case.

Since commit 11e131854f, ruleutils.c has
attempted to ensure that each RTE in a query or plan tree has a unique
alias name.  However, the code that was added for this could be quite slow,
even as bad as O(N^3) if N identical RTE names must be replaced, as noted
by Jeff Janes.  Improve matters by building a transient hash table within
set_rtable_names.  The hash table in itself reduces the cost of detecting a
duplicate from O(N) to O(1), and we can save another factor of N by storing
the number of de-duplicated names already created for each entry, so that
we don't have to re-try names already created.  This way is probably a bit
slower overall for small range tables, but almost by definition, such cases
should not be a performance problem.

In principle the same problem applies to the column-name-de-duplication
code; but in practice that seems to be less of a problem, first because
N is limited since we don't support extremely wide tables, and second
because duplicate column names within an RTE are fairly rare, so that in
practice the cost is more like O(N^2) not O(N^3).  It would be very much
messier to fix the column-name code, so for now I've left that alone.

An independent problem in the same area was that the de-duplication code
paid no attention to the identifier length limit, and would happily produce
identifiers that were longer than NAMEDATALEN and wouldn't be unique after
truncation to NAMEDATALEN.  This could result in dump/reload failures, or
perhaps even views that silently behaved differently than before.  We can
fix that by shortening the base name as needed.  Fix it for both the
relation and column name cases.

In passing, check for interrupts in set_rtable_names, just in case it's
still slow enough to be an issue.

Back-patch to 9.3 where this code was introduced.
This commit is contained in:
Tom Lane
2015-11-16 13:45:17 -05:00
parent 7d0e872086
commit faf18a9050
3 changed files with 162 additions and 47 deletions

View File

@@ -36,6 +36,7 @@
#include "commands/tablespace.h"
#include "executor/spi.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
@@ -52,6 +53,7 @@
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
@@ -260,6 +262,15 @@ typedef struct
#define deparse_columns_fetch(rangetable_index, dpns) \
((deparse_columns *) list_nth((dpns)->rtable_columns, (rangetable_index)-1))
/*
* Entry in set_rtable_names' hash table
*/
typedef struct
{
char name[NAMEDATALEN]; /* Hash key --- must be first */
int counter; /* Largest addition used so far for name */
} NameHashEntry;
/* ----------
* Global data
@@ -304,8 +315,6 @@ static int print_function_arguments(StringInfo buf, HeapTuple proctup,
static void print_function_rettype(StringInfo buf, HeapTuple proctup);
static void set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
Bitmapset *rels_used);
static bool refname_is_unique(char *refname, deparse_namespace *dpns,
List *parent_namespaces);
static void set_deparse_for_query(deparse_namespace *dpns, Query *query,
List *parent_namespaces);
static void set_simple_column_names(deparse_namespace *dpns);
@@ -2491,15 +2500,61 @@ static void
set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
Bitmapset *rels_used)
{
HASHCTL hash_ctl;
HTAB *names_hash;
NameHashEntry *hentry;
bool found;
int rtindex;
ListCell *lc;
int rtindex = 1;
dpns->rtable_names = NIL;
/* nothing more to do if empty rtable */
if (dpns->rtable == NIL)
return;
/*
* We use a hash table to hold known names, so that this process is O(N)
* not O(N^2) for N names.
*/
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = NAMEDATALEN;
hash_ctl.entrysize = sizeof(NameHashEntry);
hash_ctl.hcxt = CurrentMemoryContext;
names_hash = hash_create("set_rtable_names names",
list_length(dpns->rtable),
&hash_ctl,
HASH_ELEM | HASH_CONTEXT);
/* Preload the hash table with names appearing in parent_namespaces */
foreach(lc, parent_namespaces)
{
deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc);
ListCell *lc2;
foreach(lc2, olddpns->rtable_names)
{
char *oldname = (char *) lfirst(lc2);
if (oldname == NULL)
continue;
hentry = (NameHashEntry *) hash_search(names_hash,
oldname,
HASH_ENTER,
&found);
/* we do not complain about duplicate names in parent namespaces */
hentry->counter = 0;
}
}
/* Now we can scan the rtable */
rtindex = 1;
foreach(lc, dpns->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
char *refname;
/* Just in case this takes an unreasonable amount of time ... */
CHECK_FOR_INTERRUPTS();
if (rels_used && !bms_is_member(rtindex, rels_used))
{
/* Ignore unreferenced RTE */
@@ -2527,56 +2582,62 @@ set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
}
/*
* If the selected name isn't unique, append digits to make it so
* If the selected name isn't unique, append digits to make it so, and
* make a new hash entry for it once we've got a unique name. For a
* very long input name, we might have to truncate to stay within
* NAMEDATALEN.
*/
if (refname &&
!refname_is_unique(refname, dpns, parent_namespaces))
if (refname)
{
char *modname = (char *) palloc(strlen(refname) + 32);
int i = 0;
do
hentry = (NameHashEntry *) hash_search(names_hash,
refname,
HASH_ENTER,
&found);
if (found)
{
sprintf(modname, "%s_%d", refname, ++i);
} while (!refname_is_unique(modname, dpns, parent_namespaces));
refname = modname;
/* Name already in use, must choose a new one */
int refnamelen = strlen(refname);
char *modname = (char *) palloc(refnamelen + 16);
NameHashEntry *hentry2;
do
{
hentry->counter++;
for (;;)
{
/*
* We avoid using %.*s here because it can misbehave
* if the data is not valid in what libc thinks is the
* prevailing encoding.
*/
memcpy(modname, refname, refnamelen);
sprintf(modname + refnamelen, "_%d", hentry->counter);
if (strlen(modname) < NAMEDATALEN)
break;
/* drop chars from refname to keep all the digits */
refnamelen = pg_mbcliplen(refname, refnamelen,
refnamelen - 1);
}
hentry2 = (NameHashEntry *) hash_search(names_hash,
modname,
HASH_ENTER,
&found);
} while (found);
hentry2->counter = 0; /* init new hash entry */
refname = modname;
}
else
{
/* Name not previously used, need only initialize hentry */
hentry->counter = 0;
}
}
dpns->rtable_names = lappend(dpns->rtable_names, refname);
rtindex++;
}
}
/*
* refname_is_unique: is refname distinct from all already-chosen RTE names?
*/
static bool
refname_is_unique(char *refname, deparse_namespace *dpns,
List *parent_namespaces)
{
ListCell *lc;
foreach(lc, dpns->rtable_names)
{
char *oldname = (char *) lfirst(lc);
if (oldname && strcmp(oldname, refname) == 0)
return false;
}
foreach(lc, parent_namespaces)
{
deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc);
ListCell *lc2;
foreach(lc2, olddpns->rtable_names)
{
char *oldname = (char *) lfirst(lc2);
if (oldname && strcmp(oldname, refname) == 0)
return false;
}
}
return true;
hash_destroy(names_hash);
}
/*
@@ -3404,16 +3465,34 @@ make_colname_unique(char *colname, deparse_namespace *dpns,
deparse_columns *colinfo)
{
/*
* If the selected name isn't unique, append digits to make it so
* If the selected name isn't unique, append digits to make it so. For a
* very long input name, we might have to truncate to stay within
* NAMEDATALEN.
*/
if (!colname_is_unique(colname, dpns, colinfo))
{
char *modname = (char *) palloc(strlen(colname) + 32);
int colnamelen = strlen(colname);
char *modname = (char *) palloc(colnamelen + 16);
int i = 0;
do
{
sprintf(modname, "%s_%d", colname, ++i);
i++;
for (;;)
{
/*
* We avoid using %.*s here because it can misbehave if the
* data is not valid in what libc thinks is the prevailing
* encoding.
*/
memcpy(modname, colname, colnamelen);
sprintf(modname + colnamelen, "_%d", i);
if (strlen(modname) < NAMEDATALEN)
break;
/* drop chars from colname to keep all the digits */
colnamelen = pg_mbcliplen(colname, colnamelen,
colnamelen - 1);
}
} while (!colname_is_unique(modname, dpns, colinfo));
colname = modname;
}