mirror of
https://github.com/postgres/postgres.git
synced 2025-05-06 19:59:18 +03:00
It's possible that a subplan below a Memoize node contains a parameter from above the Memoize node. If this parameter changes then cache entries may become out-dated due to the new parameter value. Previously Memoize was mistakenly not aware of this. We fix this here by flushing the cache whenever a parameter that's not part of the cache key changes. Bug: #17213 Reported by: Elvis Pranskevichus Author: David Rowley Discussion: https://postgr.es/m/17213-988ed34b225a2862@postgresql.org Backpatch-through: 14, where Memoize was added
1222 lines
35 KiB
C
1222 lines
35 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* nodeMemoize.c
|
|
* Routines to handle caching of results from parameterized nodes
|
|
*
|
|
* Portions Copyright (c) 2021, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/executor/nodeMemoize.c
|
|
*
|
|
* Memoize nodes are intended to sit above parameterized nodes in the plan
|
|
* tree in order to cache results from them. The intention here is that a
|
|
* repeat scan with a parameter value that has already been seen by the node
|
|
* can fetch tuples from the cache rather than having to re-scan the outer
|
|
* node all over again. The query planner may choose to make use of one of
|
|
* these when it thinks rescans for previously seen values are likely enough
|
|
* to warrant adding the additional node.
|
|
*
|
|
* The method of cache we use is a hash table. When the cache fills, we never
|
|
* spill tuples to disk, instead, we choose to evict the least recently used
|
|
* cache entry from the cache. We remember the least recently used entry by
|
|
* always pushing new entries and entries we look for onto the tail of a
|
|
* doubly linked list. This means that older items always bubble to the top
|
|
* of this LRU list.
|
|
*
|
|
* Sometimes our callers won't run their scans to completion. For example a
|
|
* semi-join only needs to run until it finds a matching tuple, and once it
|
|
* does, the join operator skips to the next outer tuple and does not execute
|
|
* the inner side again on that scan. Because of this, we must keep track of
|
|
* when a cache entry is complete, and by default, we know it is when we run
|
|
* out of tuples to read during the scan. However, there are cases where we
|
|
* can mark the cache entry as complete without exhausting the scan of all
|
|
* tuples. One case is unique joins, where the join operator knows that there
|
|
* will only be at most one match for any given outer tuple. In order to
|
|
* support such cases we allow the "singlerow" option to be set for the cache.
|
|
* This option marks the cache entry as complete after we read the first tuple
|
|
* from the subnode.
|
|
*
|
|
* It's possible when we're filling the cache for a given set of parameters
|
|
* that we're unable to free enough memory to store any more tuples. If this
|
|
* happens then we'll have already evicted all other cache entries. When
|
|
* caching another tuple would cause us to exceed our memory budget, we must
|
|
* free the entry that we're currently populating and move the state machine
|
|
* into MEMO_CACHE_BYPASS_MODE. This means that we'll not attempt to cache
|
|
* any further tuples for this particular scan. We don't have the memory for
|
|
* it. The state machine will be reset again on the next rescan. If the
|
|
* memory requirements to cache the next parameter's tuples are less
|
|
* demanding, then that may allow us to start putting useful entries back into
|
|
* the cache again.
|
|
*
|
|
*
|
|
* INTERFACE ROUTINES
|
|
* ExecMemoize - lookup cache, exec subplan when not found
|
|
* ExecInitMemoize - initialize node and subnodes
|
|
* ExecEndMemoize - shutdown node and subnodes
|
|
* ExecReScanMemoize - rescan the memoize node
|
|
*
|
|
* ExecMemoizeEstimate estimates DSM space needed for parallel plan
|
|
* ExecMemoizeInitializeDSM initialize DSM for parallel plan
|
|
* ExecMemoizeInitializeWorker attach to DSM info in parallel worker
|
|
* ExecMemoizeRetrieveInstrumentation get instrumentation from worker
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "common/hashfn.h"
|
|
#include "executor/executor.h"
|
|
#include "executor/nodeMemoize.h"
|
|
#include "lib/ilist.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/datum.h"
|
|
#include "utils/lsyscache.h"
|
|
|
|
/* States of the ExecMemoize state machine */
|
|
#define MEMO_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */
|
|
#define MEMO_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */
|
|
#define MEMO_FILLING_CACHE 3 /* Read outer node to fill cache */
|
|
#define MEMO_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our
|
|
* subplan without caching anything */
|
|
#define MEMO_END_OF_SCAN 5 /* Ready for rescan */
|
|
|
|
|
|
/* Helper macros for memory accounting */
|
|
#define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(MemoizeEntry) + \
|
|
sizeof(MemoizeKey) + \
|
|
(e)->key->params->t_len);
|
|
#define CACHE_TUPLE_BYTES(t) (sizeof(MemoizeTuple) + \
|
|
(t)->mintuple->t_len)
|
|
|
|
/* MemoizeTuple Stores an individually cached tuple */
|
|
typedef struct MemoizeTuple
|
|
{
|
|
MinimalTuple mintuple; /* Cached tuple */
|
|
struct MemoizeTuple *next; /* The next tuple with the same parameter
|
|
* values or NULL if it's the last one */
|
|
} MemoizeTuple;
|
|
|
|
/*
|
|
* MemoizeKey
|
|
* The hash table key for cached entries plus the LRU list link
|
|
*/
|
|
typedef struct MemoizeKey
|
|
{
|
|
MinimalTuple params;
|
|
dlist_node lru_node; /* Pointer to next/prev key in LRU list */
|
|
} MemoizeKey;
|
|
|
|
/*
|
|
* MemoizeEntry
|
|
* The data struct that the cache hash table stores
|
|
*/
|
|
typedef struct MemoizeEntry
|
|
{
|
|
MemoizeKey *key; /* Hash key for hash table lookups */
|
|
MemoizeTuple *tuplehead; /* Pointer to the first tuple or NULL if
|
|
* no tuples are cached for this entry */
|
|
uint32 hash; /* Hash value (cached) */
|
|
char status; /* Hash status */
|
|
bool complete; /* Did we read the outer plan to completion? */
|
|
} MemoizeEntry;
|
|
|
|
|
|
#define SH_PREFIX memoize
|
|
#define SH_ELEMENT_TYPE MemoizeEntry
|
|
#define SH_KEY_TYPE MemoizeKey *
|
|
#define SH_SCOPE static inline
|
|
#define SH_DECLARE
|
|
#include "lib/simplehash.h"
|
|
|
|
static uint32 MemoizeHash_hash(struct memoize_hash *tb,
|
|
const MemoizeKey *key);
|
|
static bool MemoizeHash_equal(struct memoize_hash *tb,
|
|
const MemoizeKey *params1,
|
|
const MemoizeKey *params2);
|
|
|
|
#define SH_PREFIX memoize
|
|
#define SH_ELEMENT_TYPE MemoizeEntry
|
|
#define SH_KEY_TYPE MemoizeKey *
|
|
#define SH_KEY key
|
|
#define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key)
|
|
#define SH_EQUAL(tb, a, b) MemoizeHash_equal(tb, a, b)
|
|
#define SH_SCOPE static inline
|
|
#define SH_STORE_HASH
|
|
#define SH_GET_HASH(tb, a) a->hash
|
|
#define SH_DEFINE
|
|
#include "lib/simplehash.h"
|
|
|
|
/*
|
|
* MemoizeHash_hash
|
|
* Hash function for simplehash hashtable. 'key' is unused here as we
|
|
* require that all table lookups first populate the MemoizeState's
|
|
* probeslot with the key values to be looked up.
|
|
*/
|
|
static uint32
|
|
MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key)
|
|
{
|
|
MemoizeState *mstate = (MemoizeState *) tb->private_data;
|
|
TupleTableSlot *pslot = mstate->probeslot;
|
|
uint32 hashkey = 0;
|
|
int numkeys = mstate->nkeys;
|
|
|
|
if (mstate->binary_mode)
|
|
{
|
|
for (int i = 0; i < numkeys; i++)
|
|
{
|
|
/* rotate hashkey left 1 bit at each step */
|
|
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
|
|
|
|
if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */
|
|
{
|
|
FormData_pg_attribute *attr;
|
|
uint32 hkey;
|
|
|
|
attr = &pslot->tts_tupleDescriptor->attrs[i];
|
|
|
|
hkey = datum_image_hash(pslot->tts_values[i], attr->attbyval, attr->attlen);
|
|
|
|
hashkey ^= hkey;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
FmgrInfo *hashfunctions = mstate->hashfunctions;
|
|
Oid *collations = mstate->collations;
|
|
|
|
for (int i = 0; i < numkeys; i++)
|
|
{
|
|
/* rotate hashkey left 1 bit at each step */
|
|
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
|
|
|
|
if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */
|
|
{
|
|
uint32 hkey;
|
|
|
|
hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i],
|
|
collations[i], pslot->tts_values[i]));
|
|
hashkey ^= hkey;
|
|
}
|
|
}
|
|
}
|
|
|
|
return murmurhash32(hashkey);
|
|
}
|
|
|
|
/*
|
|
* MemoizeHash_equal
|
|
* Equality function for confirming hash value matches during a hash
|
|
* table lookup. 'key2' is never used. Instead the MemoizeState's
|
|
* probeslot is always populated with details of what's being looked up.
|
|
*/
|
|
static bool
|
|
MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1,
|
|
const MemoizeKey *key2)
|
|
{
|
|
MemoizeState *mstate = (MemoizeState *) tb->private_data;
|
|
ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
|
|
TupleTableSlot *tslot = mstate->tableslot;
|
|
TupleTableSlot *pslot = mstate->probeslot;
|
|
|
|
/* probeslot should have already been prepared by prepare_probe_slot() */
|
|
ExecStoreMinimalTuple(key1->params, tslot, false);
|
|
|
|
if (mstate->binary_mode)
|
|
{
|
|
int numkeys = mstate->nkeys;
|
|
|
|
slot_getallattrs(tslot);
|
|
slot_getallattrs(pslot);
|
|
|
|
for (int i = 0; i < numkeys; i++)
|
|
{
|
|
FormData_pg_attribute *attr;
|
|
|
|
if (tslot->tts_isnull[i] != pslot->tts_isnull[i])
|
|
return false;
|
|
|
|
/* both NULL? they're equal */
|
|
if (tslot->tts_isnull[i])
|
|
continue;
|
|
|
|
/* perform binary comparison on the two datums */
|
|
attr = &tslot->tts_tupleDescriptor->attrs[i];
|
|
if (!datum_image_eq(tslot->tts_values[i], pslot->tts_values[i],
|
|
attr->attbyval, attr->attlen))
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
econtext->ecxt_innertuple = tslot;
|
|
econtext->ecxt_outertuple = pslot;
|
|
return ExecQualAndReset(mstate->cache_eq_expr, econtext);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Initialize the hash table to empty.
|
|
*/
|
|
static void
|
|
build_hash_table(MemoizeState *mstate, uint32 size)
|
|
{
|
|
/* Make a guess at a good size when we're not given a valid size. */
|
|
if (size == 0)
|
|
size = 1024;
|
|
|
|
/* memoize_create will convert the size to a power of 2 */
|
|
mstate->hashtable = memoize_create(mstate->tableContext, size, mstate);
|
|
}
|
|
|
|
/*
|
|
* prepare_probe_slot
|
|
* Populate mstate's probeslot with the values from the tuple stored
|
|
* in 'key'. If 'key' is NULL, then perform the population by evaluating
|
|
* mstate's param_exprs.
|
|
*/
|
|
static inline void
|
|
prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key)
|
|
{
|
|
TupleTableSlot *pslot = mstate->probeslot;
|
|
TupleTableSlot *tslot = mstate->tableslot;
|
|
int numKeys = mstate->nkeys;
|
|
|
|
ExecClearTuple(pslot);
|
|
|
|
if (key == NULL)
|
|
{
|
|
/* Set the probeslot's values based on the current parameter values */
|
|
for (int i = 0; i < numKeys; i++)
|
|
pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i],
|
|
mstate->ss.ps.ps_ExprContext,
|
|
&pslot->tts_isnull[i]);
|
|
}
|
|
else
|
|
{
|
|
/* Process the key's MinimalTuple and store the values in probeslot */
|
|
ExecStoreMinimalTuple(key->params, tslot, false);
|
|
slot_getallattrs(tslot);
|
|
memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys);
|
|
memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys);
|
|
}
|
|
|
|
ExecStoreVirtualTuple(pslot);
|
|
}
|
|
|
|
/*
|
|
* entry_purge_tuples
|
|
* Remove all tuples from the cache entry pointed to by 'entry'. This
|
|
* leaves an empty cache entry. Also, update the memory accounting to
|
|
* reflect the removal of the tuples.
|
|
*/
|
|
static inline void
|
|
entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry)
|
|
{
|
|
MemoizeTuple *tuple = entry->tuplehead;
|
|
uint64 freed_mem = 0;
|
|
|
|
while (tuple != NULL)
|
|
{
|
|
MemoizeTuple *next = tuple->next;
|
|
|
|
freed_mem += CACHE_TUPLE_BYTES(tuple);
|
|
|
|
/* Free memory used for this tuple */
|
|
pfree(tuple->mintuple);
|
|
pfree(tuple);
|
|
|
|
tuple = next;
|
|
}
|
|
|
|
entry->complete = false;
|
|
entry->tuplehead = NULL;
|
|
|
|
/* Update the memory accounting */
|
|
mstate->mem_used -= freed_mem;
|
|
}
|
|
|
|
/*
|
|
* remove_cache_entry
|
|
* Remove 'entry' from the cache and free memory used by it.
|
|
*/
|
|
static void
|
|
remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry)
|
|
{
|
|
MemoizeKey *key = entry->key;
|
|
|
|
dlist_delete(&entry->key->lru_node);
|
|
|
|
/* Remove all of the tuples from this entry */
|
|
entry_purge_tuples(mstate, entry);
|
|
|
|
/*
|
|
* Update memory accounting. entry_purge_tuples should have already
|
|
* subtracted the memory used for each cached tuple. Here we just update
|
|
* the amount used by the entry itself.
|
|
*/
|
|
mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry);
|
|
|
|
/* Remove the entry from the cache */
|
|
memoize_delete_item(mstate->hashtable, entry);
|
|
|
|
pfree(key->params);
|
|
pfree(key);
|
|
}
|
|
|
|
/*
|
|
* cache_purge_all
|
|
* Remove all items from the cache
|
|
*/
|
|
static void
|
|
cache_purge_all(MemoizeState *mstate)
|
|
{
|
|
uint64 evictions = mstate->hashtable->members;
|
|
PlanState *pstate = (PlanState *) mstate;
|
|
|
|
/*
|
|
* Likely the most efficient way to remove all items is to just reset the
|
|
* memory context for the cache and then rebuild a fresh hash table. This
|
|
* saves having to remove each item one by one and pfree each cached tuple
|
|
*/
|
|
MemoryContextReset(mstate->tableContext);
|
|
|
|
/* Make the hash table the same size as the original size */
|
|
build_hash_table(mstate, ((Memoize *) pstate->plan)->est_entries);
|
|
|
|
/* reset the LRU list */
|
|
dlist_init(&mstate->lru_list);
|
|
mstate->last_tuple = NULL;
|
|
mstate->entry = NULL;
|
|
|
|
mstate->mem_used = 0;
|
|
|
|
/* XXX should we add something new to track these purges? */
|
|
mstate->stats.cache_evictions += evictions; /* Update Stats */
|
|
}
|
|
|
|
/*
|
|
* cache_reduce_memory
|
|
* Evict older and less recently used items from the cache in order to
|
|
* reduce the memory consumption back to something below the
|
|
* MemoizeState's mem_limit.
|
|
*
|
|
* 'specialkey', if not NULL, causes the function to return false if the entry
|
|
* which the key belongs to is removed from the cache.
|
|
*/
|
|
static bool
|
|
cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey)
|
|
{
|
|
bool specialkey_intact = true; /* for now */
|
|
dlist_mutable_iter iter;
|
|
uint64 evictions = 0;
|
|
|
|
/* Update peak memory usage */
|
|
if (mstate->mem_used > mstate->stats.mem_peak)
|
|
mstate->stats.mem_peak = mstate->mem_used;
|
|
|
|
/* We expect only to be called when we've gone over budget on memory */
|
|
Assert(mstate->mem_used > mstate->mem_limit);
|
|
|
|
/* Start the eviction process starting at the head of the LRU list. */
|
|
dlist_foreach_modify(iter, &mstate->lru_list)
|
|
{
|
|
MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur);
|
|
MemoizeEntry *entry;
|
|
|
|
/*
|
|
* Populate the hash probe slot in preparation for looking up this LRU
|
|
* entry.
|
|
*/
|
|
prepare_probe_slot(mstate, key);
|
|
|
|
/*
|
|
* Ideally the LRU list pointers would be stored in the entry itself
|
|
* rather than in the key. Unfortunately, we can't do that as the
|
|
* simplehash.h code may resize the table and allocate new memory for
|
|
* entries which would result in those pointers pointing to the old
|
|
* buckets. However, it's fine to use the key to store this as that's
|
|
* only referenced by a pointer in the entry, which of course follows
|
|
* the entry whenever the hash table is resized. Since we only have a
|
|
* pointer to the key here, we must perform a hash table lookup to
|
|
* find the entry that the key belongs to.
|
|
*/
|
|
entry = memoize_lookup(mstate->hashtable, NULL);
|
|
|
|
/* A good spot to check for corruption of the table and LRU list. */
|
|
Assert(entry != NULL);
|
|
Assert(entry->key == key);
|
|
|
|
/*
|
|
* If we're being called to free memory while the cache is being
|
|
* populated with new tuples, then we'd better take some care as we
|
|
* could end up freeing the entry which 'specialkey' belongs to.
|
|
* Generally callers will pass 'specialkey' as the key for the cache
|
|
* entry which is currently being populated, so we must set
|
|
* 'specialkey_intact' to false to inform the caller the specialkey
|
|
* entry has been removed.
|
|
*/
|
|
if (key == specialkey)
|
|
specialkey_intact = false;
|
|
|
|
/*
|
|
* Finally remove the entry. This will remove from the LRU list too.
|
|
*/
|
|
remove_cache_entry(mstate, entry);
|
|
|
|
evictions++;
|
|
|
|
/* Exit if we've freed enough memory */
|
|
if (mstate->mem_used <= mstate->mem_limit)
|
|
break;
|
|
}
|
|
|
|
mstate->stats.cache_evictions += evictions; /* Update Stats */
|
|
|
|
return specialkey_intact;
|
|
}
|
|
|
|
/*
|
|
* cache_lookup
|
|
* Perform a lookup to see if we've already cached tuples based on the
|
|
* scan's current parameters. If we find an existing entry we move it to
|
|
* the end of the LRU list, set *found to true then return it. If we
|
|
* don't find an entry then we create a new one and add it to the end of
|
|
* the LRU list. We also update cache memory accounting and remove older
|
|
* entries if we go over the memory budget. If we managed to free enough
|
|
* memory we return the new entry, else we return NULL.
|
|
*
|
|
* Callers can assume we'll never return NULL when *found is true.
|
|
*/
|
|
static MemoizeEntry *
|
|
cache_lookup(MemoizeState *mstate, bool *found)
|
|
{
|
|
MemoizeKey *key;
|
|
MemoizeEntry *entry;
|
|
MemoryContext oldcontext;
|
|
|
|
/* prepare the probe slot with the current scan parameters */
|
|
prepare_probe_slot(mstate, NULL);
|
|
|
|
/*
|
|
* Add the new entry to the cache. No need to pass a valid key since the
|
|
* hash function uses mstate's probeslot, which we populated above.
|
|
*/
|
|
entry = memoize_insert(mstate->hashtable, NULL, found);
|
|
|
|
if (*found)
|
|
{
|
|
/*
|
|
* Move existing entry to the tail of the LRU list to mark it as the
|
|
* most recently used item.
|
|
*/
|
|
dlist_move_tail(&mstate->lru_list, &entry->key->lru_node);
|
|
|
|
return entry;
|
|
}
|
|
|
|
oldcontext = MemoryContextSwitchTo(mstate->tableContext);
|
|
|
|
/* Allocate a new key */
|
|
entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey));
|
|
key->params = ExecCopySlotMinimalTuple(mstate->probeslot);
|
|
|
|
/* Update the total cache memory utilization */
|
|
mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry);
|
|
|
|
/* Initialize this entry */
|
|
entry->complete = false;
|
|
entry->tuplehead = NULL;
|
|
|
|
/*
|
|
* Since this is the most recently used entry, push this entry onto the
|
|
* end of the LRU list.
|
|
*/
|
|
dlist_push_tail(&mstate->lru_list, &entry->key->lru_node);
|
|
|
|
mstate->last_tuple = NULL;
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/*
|
|
* If we've gone over our memory budget, then we'll free up some space in
|
|
* the cache.
|
|
*/
|
|
if (mstate->mem_used > mstate->mem_limit)
|
|
{
|
|
/*
|
|
* Try to free up some memory. It's highly unlikely that we'll fail
|
|
* to do so here since the entry we've just added is yet to contain
|
|
* any tuples and we're able to remove any other entry to reduce the
|
|
* memory consumption.
|
|
*/
|
|
if (unlikely(!cache_reduce_memory(mstate, key)))
|
|
return NULL;
|
|
|
|
/*
|
|
* The process of removing entries from the cache may have caused the
|
|
* code in simplehash.h to shuffle elements to earlier buckets in the
|
|
* hash table. If it has, we'll need to find the entry again by
|
|
* performing a lookup. Fortunately, we can detect if this has
|
|
* happened by seeing if the entry is still in use and that the key
|
|
* pointer matches our expected key.
|
|
*/
|
|
if (entry->status != memoize_SH_IN_USE || entry->key != key)
|
|
{
|
|
/*
|
|
* We need to repopulate the probeslot as lookups performed during
|
|
* the cache evictions above will have stored some other key.
|
|
*/
|
|
prepare_probe_slot(mstate, key);
|
|
|
|
/* Re-find the newly added entry */
|
|
entry = memoize_lookup(mstate->hashtable, NULL);
|
|
Assert(entry != NULL);
|
|
}
|
|
}
|
|
|
|
return entry;
|
|
}
|
|
|
|
/*
|
|
* cache_store_tuple
|
|
* Add the tuple stored in 'slot' to the mstate's current cache entry.
|
|
* The cache entry must have already been made with cache_lookup().
|
|
* mstate's last_tuple field must point to the tail of mstate->entry's
|
|
* list of tuples.
|
|
*/
|
|
static bool
|
|
cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot)
|
|
{
|
|
MemoizeTuple *tuple;
|
|
MemoizeEntry *entry = mstate->entry;
|
|
MemoryContext oldcontext;
|
|
|
|
Assert(slot != NULL);
|
|
Assert(entry != NULL);
|
|
|
|
oldcontext = MemoryContextSwitchTo(mstate->tableContext);
|
|
|
|
tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple));
|
|
tuple->mintuple = ExecCopySlotMinimalTuple(slot);
|
|
tuple->next = NULL;
|
|
|
|
/* Account for the memory we just consumed */
|
|
mstate->mem_used += CACHE_TUPLE_BYTES(tuple);
|
|
|
|
if (entry->tuplehead == NULL)
|
|
{
|
|
/*
|
|
* This is the first tuple for this entry, so just point the list head
|
|
* to it.
|
|
*/
|
|
entry->tuplehead = tuple;
|
|
}
|
|
else
|
|
{
|
|
/* push this tuple onto the tail of the list */
|
|
mstate->last_tuple->next = tuple;
|
|
}
|
|
|
|
mstate->last_tuple = tuple;
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/*
|
|
* If we've gone over our memory budget then free up some space in the
|
|
* cache.
|
|
*/
|
|
if (mstate->mem_used > mstate->mem_limit)
|
|
{
|
|
MemoizeKey *key = entry->key;
|
|
|
|
if (!cache_reduce_memory(mstate, key))
|
|
return false;
|
|
|
|
/*
|
|
* The process of removing entries from the cache may have caused the
|
|
* code in simplehash.h to shuffle elements to earlier buckets in the
|
|
* hash table. If it has, we'll need to find the entry again by
|
|
* performing a lookup. Fortunately, we can detect if this has
|
|
* happened by seeing if the entry is still in use and that the key
|
|
* pointer matches our expected key.
|
|
*/
|
|
if (entry->status != memoize_SH_IN_USE || entry->key != key)
|
|
{
|
|
/*
|
|
* We need to repopulate the probeslot as lookups performed during
|
|
* the cache evictions above will have stored some other key.
|
|
*/
|
|
prepare_probe_slot(mstate, key);
|
|
|
|
/* Re-find the entry */
|
|
mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL);
|
|
Assert(entry != NULL);
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
static TupleTableSlot *
|
|
ExecMemoize(PlanState *pstate)
|
|
{
|
|
MemoizeState *node = castNode(MemoizeState, pstate);
|
|
PlanState *outerNode;
|
|
TupleTableSlot *slot;
|
|
|
|
switch (node->mstatus)
|
|
{
|
|
case MEMO_CACHE_LOOKUP:
|
|
{
|
|
MemoizeEntry *entry;
|
|
TupleTableSlot *outerslot;
|
|
bool found;
|
|
|
|
Assert(node->entry == NULL);
|
|
|
|
/*
|
|
* We're only ever in this state for the first call of the
|
|
* scan. Here we have a look to see if we've already seen the
|
|
* current parameters before and if we have already cached a
|
|
* complete set of records that the outer plan will return for
|
|
* these parameters.
|
|
*
|
|
* When we find a valid cache entry, we'll return the first
|
|
* tuple from it. If not found, we'll create a cache entry and
|
|
* then try to fetch a tuple from the outer scan. If we find
|
|
* one there, we'll try to cache it.
|
|
*/
|
|
|
|
/* see if we've got anything cached for the current parameters */
|
|
entry = cache_lookup(node, &found);
|
|
|
|
if (found && entry->complete)
|
|
{
|
|
node->stats.cache_hits += 1; /* stats update */
|
|
|
|
/*
|
|
* Set last_tuple and entry so that the state
|
|
* MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next
|
|
* tuple for these parameters.
|
|
*/
|
|
node->last_tuple = entry->tuplehead;
|
|
node->entry = entry;
|
|
|
|
/* Fetch the first cached tuple, if there is one */
|
|
if (entry->tuplehead)
|
|
{
|
|
node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE;
|
|
|
|
slot = node->ss.ps.ps_ResultTupleSlot;
|
|
ExecStoreMinimalTuple(entry->tuplehead->mintuple,
|
|
slot, false);
|
|
|
|
return slot;
|
|
}
|
|
|
|
/* The cache entry is void of any tuples. */
|
|
node->mstatus = MEMO_END_OF_SCAN;
|
|
return NULL;
|
|
}
|
|
|
|
/* Handle cache miss */
|
|
node->stats.cache_misses += 1; /* stats update */
|
|
|
|
if (found)
|
|
{
|
|
/*
|
|
* A cache entry was found, but the scan for that entry
|
|
* did not run to completion. We'll just remove all
|
|
* tuples and start again. It might be tempting to
|
|
* continue where we left off, but there's no guarantee
|
|
* the outer node will produce the tuples in the same
|
|
* order as it did last time.
|
|
*/
|
|
entry_purge_tuples(node, entry);
|
|
}
|
|
|
|
/* Scan the outer node for a tuple to cache */
|
|
outerNode = outerPlanState(node);
|
|
outerslot = ExecProcNode(outerNode);
|
|
if (TupIsNull(outerslot))
|
|
{
|
|
/*
|
|
* cache_lookup may have returned NULL due to failure to
|
|
* free enough cache space, so ensure we don't do anything
|
|
* here that assumes it worked. There's no need to go into
|
|
* bypass mode here as we're setting mstatus to end of
|
|
* scan.
|
|
*/
|
|
if (likely(entry))
|
|
entry->complete = true;
|
|
|
|
node->mstatus = MEMO_END_OF_SCAN;
|
|
return NULL;
|
|
}
|
|
|
|
node->entry = entry;
|
|
|
|
/*
|
|
* If we failed to create the entry or failed to store the
|
|
* tuple in the entry, then go into bypass mode.
|
|
*/
|
|
if (unlikely(entry == NULL ||
|
|
!cache_store_tuple(node, outerslot)))
|
|
{
|
|
node->stats.cache_overflows += 1; /* stats update */
|
|
|
|
node->mstatus = MEMO_CACHE_BYPASS_MODE;
|
|
|
|
/*
|
|
* No need to clear out last_tuple as we'll stay in bypass
|
|
* mode until the end of the scan.
|
|
*/
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* If we only expect a single row from this scan then we
|
|
* can mark that we're not expecting more. This allows
|
|
* cache lookups to work even when the scan has not been
|
|
* executed to completion.
|
|
*/
|
|
entry->complete = node->singlerow;
|
|
node->mstatus = MEMO_FILLING_CACHE;
|
|
}
|
|
|
|
slot = node->ss.ps.ps_ResultTupleSlot;
|
|
ExecCopySlot(slot, outerslot);
|
|
return slot;
|
|
}
|
|
|
|
case MEMO_CACHE_FETCH_NEXT_TUPLE:
|
|
{
|
|
/* We shouldn't be in this state if these are not set */
|
|
Assert(node->entry != NULL);
|
|
Assert(node->last_tuple != NULL);
|
|
|
|
/* Skip to the next tuple to output */
|
|
node->last_tuple = node->last_tuple->next;
|
|
|
|
/* No more tuples in the cache */
|
|
if (node->last_tuple == NULL)
|
|
{
|
|
node->mstatus = MEMO_END_OF_SCAN;
|
|
return NULL;
|
|
}
|
|
|
|
slot = node->ss.ps.ps_ResultTupleSlot;
|
|
ExecStoreMinimalTuple(node->last_tuple->mintuple, slot,
|
|
false);
|
|
|
|
return slot;
|
|
}
|
|
|
|
case MEMO_FILLING_CACHE:
|
|
{
|
|
TupleTableSlot *outerslot;
|
|
MemoizeEntry *entry = node->entry;
|
|
|
|
/* entry should already have been set by MEMO_CACHE_LOOKUP */
|
|
Assert(entry != NULL);
|
|
|
|
/*
|
|
* When in the MEMO_FILLING_CACHE state, we've just had a
|
|
* cache miss and are populating the cache with the current
|
|
* scan tuples.
|
|
*/
|
|
outerNode = outerPlanState(node);
|
|
outerslot = ExecProcNode(outerNode);
|
|
if (TupIsNull(outerslot))
|
|
{
|
|
/* No more tuples. Mark it as complete */
|
|
entry->complete = true;
|
|
node->mstatus = MEMO_END_OF_SCAN;
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* Validate if the planner properly set the singlerow flag. It
|
|
* should only set that if each cache entry can, at most,
|
|
* return 1 row.
|
|
*/
|
|
if (unlikely(entry->complete))
|
|
elog(ERROR, "cache entry already complete");
|
|
|
|
/* Record the tuple in the current cache entry */
|
|
if (unlikely(!cache_store_tuple(node, outerslot)))
|
|
{
|
|
/* Couldn't store it? Handle overflow */
|
|
node->stats.cache_overflows += 1; /* stats update */
|
|
|
|
node->mstatus = MEMO_CACHE_BYPASS_MODE;
|
|
|
|
/*
|
|
* No need to clear out entry or last_tuple as we'll stay
|
|
* in bypass mode until the end of the scan.
|
|
*/
|
|
}
|
|
|
|
slot = node->ss.ps.ps_ResultTupleSlot;
|
|
ExecCopySlot(slot, outerslot);
|
|
return slot;
|
|
}
|
|
|
|
case MEMO_CACHE_BYPASS_MODE:
|
|
{
|
|
TupleTableSlot *outerslot;
|
|
|
|
/*
|
|
* When in bypass mode we just continue to read tuples without
|
|
* caching. We need to wait until the next rescan before we
|
|
* can come out of this mode.
|
|
*/
|
|
outerNode = outerPlanState(node);
|
|
outerslot = ExecProcNode(outerNode);
|
|
if (TupIsNull(outerslot))
|
|
{
|
|
node->mstatus = MEMO_END_OF_SCAN;
|
|
return NULL;
|
|
}
|
|
|
|
slot = node->ss.ps.ps_ResultTupleSlot;
|
|
ExecCopySlot(slot, outerslot);
|
|
return slot;
|
|
}
|
|
|
|
case MEMO_END_OF_SCAN:
|
|
|
|
/*
|
|
* We've already returned NULL for this scan, but just in case
|
|
* something calls us again by mistake.
|
|
*/
|
|
return NULL;
|
|
|
|
default:
|
|
elog(ERROR, "unrecognized memoize state: %d",
|
|
(int) node->mstatus);
|
|
return NULL;
|
|
} /* switch */
|
|
}
|
|
|
|
MemoizeState *
|
|
ExecInitMemoize(Memoize *node, EState *estate, int eflags)
|
|
{
|
|
MemoizeState *mstate = makeNode(MemoizeState);
|
|
Plan *outerNode;
|
|
int i;
|
|
int nkeys;
|
|
Oid *eqfuncoids;
|
|
|
|
/* check for unsupported flags */
|
|
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
|
|
|
|
mstate->ss.ps.plan = (Plan *) node;
|
|
mstate->ss.ps.state = estate;
|
|
mstate->ss.ps.ExecProcNode = ExecMemoize;
|
|
|
|
/*
|
|
* Miscellaneous initialization
|
|
*
|
|
* create expression context for node
|
|
*/
|
|
ExecAssignExprContext(estate, &mstate->ss.ps);
|
|
|
|
outerNode = outerPlan(node);
|
|
outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags);
|
|
|
|
/*
|
|
* Initialize return slot and type. No need to initialize projection info
|
|
* because this node doesn't do projections.
|
|
*/
|
|
ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple);
|
|
mstate->ss.ps.ps_ProjInfo = NULL;
|
|
|
|
/*
|
|
* Initialize scan slot and type.
|
|
*/
|
|
ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple);
|
|
|
|
/*
|
|
* Set the state machine to lookup the cache. We won't find anything
|
|
* until we cache something, but this saves a special case to create the
|
|
* first entry.
|
|
*/
|
|
mstate->mstatus = MEMO_CACHE_LOOKUP;
|
|
|
|
mstate->nkeys = nkeys = node->numKeys;
|
|
mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs);
|
|
mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
|
|
&TTSOpsMinimalTuple);
|
|
mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
|
|
&TTSOpsVirtual);
|
|
|
|
mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *));
|
|
mstate->collations = node->collations; /* Just point directly to the plan
|
|
* data */
|
|
mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
|
|
|
|
eqfuncoids = palloc(nkeys * sizeof(Oid));
|
|
|
|
for (i = 0; i < nkeys; i++)
|
|
{
|
|
Oid hashop = node->hashOperators[i];
|
|
Oid left_hashfn;
|
|
Oid right_hashfn;
|
|
Expr *param_expr = (Expr *) list_nth(node->param_exprs, i);
|
|
|
|
if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
|
|
elog(ERROR, "could not find hash function for hash operator %u",
|
|
hashop);
|
|
|
|
fmgr_info(left_hashfn, &mstate->hashfunctions[i]);
|
|
|
|
mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate);
|
|
eqfuncoids[i] = get_opcode(hashop);
|
|
}
|
|
|
|
mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc,
|
|
&TTSOpsMinimalTuple,
|
|
&TTSOpsVirtual,
|
|
eqfuncoids,
|
|
node->collations,
|
|
node->param_exprs,
|
|
(PlanState *) mstate);
|
|
|
|
pfree(eqfuncoids);
|
|
mstate->mem_used = 0;
|
|
|
|
/* Limit the total memory consumed by the cache to this */
|
|
mstate->mem_limit = get_hash_memory_limit();
|
|
|
|
/* A memory context dedicated for the cache */
|
|
mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"MemoizeHashTable",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
dlist_init(&mstate->lru_list);
|
|
mstate->last_tuple = NULL;
|
|
mstate->entry = NULL;
|
|
|
|
/*
|
|
* Mark if we can assume the cache entry is completed after we get the
|
|
* first record for it. Some callers might not call us again after
|
|
* getting the first match. e.g. A join operator performing a unique join
|
|
* is able to skip to the next outer tuple after getting the first
|
|
* matching inner tuple. In this case, the cache entry is complete after
|
|
* getting the first tuple. This allows us to mark it as so.
|
|
*/
|
|
mstate->singlerow = node->singlerow;
|
|
mstate->keyparamids = node->keyparamids;
|
|
|
|
/*
|
|
* Record if the cache keys should be compared bit by bit, or logically
|
|
* using the type's hash equality operator
|
|
*/
|
|
mstate->binary_mode = node->binary_mode;
|
|
|
|
/* Zero the statistics counters */
|
|
memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation));
|
|
|
|
/* Allocate and set up the actual cache */
|
|
build_hash_table(mstate, node->est_entries);
|
|
|
|
return mstate;
|
|
}
|
|
|
|
void
|
|
ExecEndMemoize(MemoizeState *node)
|
|
{
|
|
#ifdef USE_ASSERT_CHECKING
|
|
/* Validate the memory accounting code is correct in assert builds. */
|
|
{
|
|
int count;
|
|
uint64 mem = 0;
|
|
memoize_iterator i;
|
|
MemoizeEntry *entry;
|
|
|
|
memoize_start_iterate(node->hashtable, &i);
|
|
|
|
count = 0;
|
|
while ((entry = memoize_iterate(node->hashtable, &i)) != NULL)
|
|
{
|
|
MemoizeTuple *tuple = entry->tuplehead;
|
|
|
|
mem += EMPTY_ENTRY_MEMORY_BYTES(entry);
|
|
while (tuple != NULL)
|
|
{
|
|
mem += CACHE_TUPLE_BYTES(tuple);
|
|
tuple = tuple->next;
|
|
}
|
|
count++;
|
|
}
|
|
|
|
Assert(count == node->hashtable->members);
|
|
Assert(mem == node->mem_used);
|
|
}
|
|
#endif
|
|
|
|
/*
|
|
* When ending a parallel worker, copy the statistics gathered by the
|
|
* worker back into shared memory so that it can be picked up by the main
|
|
* process to report in EXPLAIN ANALYZE.
|
|
*/
|
|
if (node->shared_info != NULL && IsParallelWorker())
|
|
{
|
|
MemoizeInstrumentation *si;
|
|
|
|
/* Make mem_peak available for EXPLAIN */
|
|
if (node->stats.mem_peak == 0)
|
|
node->stats.mem_peak = node->mem_used;
|
|
|
|
Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
|
|
si = &node->shared_info->sinstrument[ParallelWorkerNumber];
|
|
memcpy(si, &node->stats, sizeof(MemoizeInstrumentation));
|
|
}
|
|
|
|
/* Remove the cache context */
|
|
MemoryContextDelete(node->tableContext);
|
|
|
|
ExecClearTuple(node->ss.ss_ScanTupleSlot);
|
|
/* must drop pointer to cache result tuple */
|
|
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
|
|
|
|
/*
|
|
* free exprcontext
|
|
*/
|
|
ExecFreeExprContext(&node->ss.ps);
|
|
|
|
/*
|
|
* shut down the subplan
|
|
*/
|
|
ExecEndNode(outerPlanState(node));
|
|
}
|
|
|
|
void
|
|
ExecReScanMemoize(MemoizeState *node)
|
|
{
|
|
PlanState *outerPlan = outerPlanState(node);
|
|
|
|
/* Mark that we must lookup the cache for a new set of parameters */
|
|
node->mstatus = MEMO_CACHE_LOOKUP;
|
|
|
|
/* nullify pointers used for the last scan */
|
|
node->entry = NULL;
|
|
node->last_tuple = NULL;
|
|
|
|
/*
|
|
* if chgParam of subnode is not null then plan will be re-scanned by
|
|
* first ExecProcNode.
|
|
*/
|
|
if (outerPlan->chgParam == NULL)
|
|
ExecReScan(outerPlan);
|
|
|
|
/*
|
|
* Purge the entire cache if a parameter changed that is not part of the
|
|
* cache key.
|
|
*/
|
|
if (bms_nonempty_difference(outerPlan->chgParam, node->keyparamids))
|
|
cache_purge_all(node);
|
|
}
|
|
|
|
/*
|
|
* ExecEstimateCacheEntryOverheadBytes
|
|
* For use in the query planner to help it estimate the amount of memory
|
|
* required to store a single entry in the cache.
|
|
*/
|
|
double
|
|
ExecEstimateCacheEntryOverheadBytes(double ntuples)
|
|
{
|
|
return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) *
|
|
ntuples;
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* Parallel Query Support
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecMemoizeEstimate
|
|
*
|
|
* Estimate space required to propagate memoize statistics.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
|
|
{
|
|
Size size;
|
|
|
|
/* don't need this if not instrumenting or no workers */
|
|
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
|
|
return;
|
|
|
|
size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation));
|
|
size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument));
|
|
shm_toc_estimate_chunk(&pcxt->estimator, size);
|
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecMemoizeInitializeDSM
|
|
*
|
|
* Initialize DSM space for memoize statistics.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
|
|
{
|
|
Size size;
|
|
|
|
/* don't need this if not instrumenting or no workers */
|
|
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
|
|
return;
|
|
|
|
size = offsetof(SharedMemoizeInfo, sinstrument)
|
|
+ pcxt->nworkers * sizeof(MemoizeInstrumentation);
|
|
node->shared_info = shm_toc_allocate(pcxt->toc, size);
|
|
/* ensure any unfilled slots will contain zeroes */
|
|
memset(node->shared_info, 0, size);
|
|
node->shared_info->num_workers = pcxt->nworkers;
|
|
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
|
|
node->shared_info);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecMemoizeInitializeWorker
|
|
*
|
|
* Attach worker to DSM space for memoize statistics.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
|
|
{
|
|
node->shared_info =
|
|
shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
|
|
}
|
|
|
|
/* ----------------------------------------------------------------
|
|
* ExecMemoizeRetrieveInstrumentation
|
|
*
|
|
* Transfer memoize statistics from DSM to private memory.
|
|
* ----------------------------------------------------------------
|
|
*/
|
|
void
|
|
ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
|
|
{
|
|
Size size;
|
|
SharedMemoizeInfo *si;
|
|
|
|
if (node->shared_info == NULL)
|
|
return;
|
|
|
|
size = offsetof(SharedMemoizeInfo, sinstrument)
|
|
+ node->shared_info->num_workers * sizeof(MemoizeInstrumentation);
|
|
si = palloc(size);
|
|
memcpy(si, node->shared_info, size);
|
|
node->shared_info = si;
|
|
}
|