1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-18 17:42:25 +03:00

Create an API for inserting and deleting rows in TOAST tables.

This moves much of the non-heap-specific logic from toast_delete and
toast_insert_or_update into a helper functions accessible via a new
header, toast_helper.h.  Using the functions in this module, a table
AM can implement creation and deletion of TOAST table rows with
much less code duplication than was possible heretofore.  Some
table AMs won't want to use the TOAST logic at all, but for those
that do this will make that easier.

Patch by me, reviewed and tested by Prabhat Sabu, Thomas Munro,
Andres Freund, and Álvaro Herrera.

Discussion: http://postgr.es/m/CA+TgmoZv-=2iWM4jcw5ZhJeL18HF96+W1yJeYrnGMYdkFFnEpQ@mail.gmail.com
This commit is contained in:
Robert Haas
2019-09-06 10:38:51 -04:00
parent 286af0ce12
commit bd124996ef
5 changed files with 493 additions and 357 deletions

View File

@ -27,6 +27,7 @@
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/heaptoast.h"
#include "access/toast_helper.h"
#include "access/toast_internals.h"
@ -40,8 +41,6 @@ void
toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
{
TupleDesc tupleDesc;
int numAttrs;
int i;
Datum toast_values[MaxHeapAttributeNumber];
bool toast_isnull[MaxHeapAttributeNumber];
@ -64,27 +63,12 @@ toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
* least one varlena column, by the way.)
*/
tupleDesc = rel->rd_att;
numAttrs = tupleDesc->natts;
Assert(numAttrs <= MaxHeapAttributeNumber);
Assert(tupleDesc->natts <= MaxHeapAttributeNumber);
heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
/*
* Check for external stored attributes and delete them from the secondary
* relation.
*/
for (i = 0; i < numAttrs; i++)
{
if (TupleDescAttr(tupleDesc, i)->attlen == -1)
{
Datum value = toast_values[i];
if (toast_isnull[i])
continue;
else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
toast_delete_datum(rel, value, is_speculative);
}
}
/* Do the real work. */
toast_delete_external(rel, toast_values, toast_isnull, is_speculative);
}
@ -113,25 +97,16 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
HeapTuple result_tuple;
TupleDesc tupleDesc;
int numAttrs;
int i;
bool need_change = false;
bool need_free = false;
bool need_delold = false;
bool has_nulls = false;
Size maxDataLen;
Size hoff;
char toast_action[MaxHeapAttributeNumber];
bool toast_isnull[MaxHeapAttributeNumber];
bool toast_oldisnull[MaxHeapAttributeNumber];
Datum toast_values[MaxHeapAttributeNumber];
Datum toast_oldvalues[MaxHeapAttributeNumber];
struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
int32 toast_sizes[MaxHeapAttributeNumber];
bool toast_free[MaxHeapAttributeNumber];
bool toast_delold[MaxHeapAttributeNumber];
ToastAttrInfo toast_attr[MaxHeapAttributeNumber];
ToastTupleContext ttc;
/*
* Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
@ -160,129 +135,24 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
/* ----------
* Then collect information about the values given
*
* NOTE: toast_action[i] can have these values:
* ' ' default handling
* 'p' already processed --- don't touch it
* 'x' incompressible, but OK to move off
*
* NOTE: toast_sizes[i] is only made valid for varlena attributes with
* toast_action[i] different from 'p'.
* Prepare for toasting
* ----------
*/
memset(toast_action, ' ', numAttrs * sizeof(char));
memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
memset(toast_free, 0, numAttrs * sizeof(bool));
memset(toast_delold, 0, numAttrs * sizeof(bool));
for (i = 0; i < numAttrs; i++)
ttc.ttc_rel = rel;
ttc.ttc_values = toast_values;
ttc.ttc_isnull = toast_isnull;
if (oldtup == NULL)
{
Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
struct varlena *old_value;
struct varlena *new_value;
if (oldtup != NULL)
{
/*
* For UPDATE get the old and new values of this attribute
*/
old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
/*
* If the old value is stored on disk, check if it has changed so
* we have to delete it later.
*/
if (att->attlen == -1 && !toast_oldisnull[i] &&
VARATT_IS_EXTERNAL_ONDISK(old_value))
{
if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
memcmp((char *) old_value, (char *) new_value,
VARSIZE_EXTERNAL(old_value)) != 0)
{
/*
* The old external stored value isn't needed any more
* after the update
*/
toast_delold[i] = true;
need_delold = true;
}
else
{
/*
* This attribute isn't changed by this update so we reuse
* the original reference to the old value in the new
* tuple.
*/
toast_action[i] = 'p';
continue;
}
}
}
else
{
/*
* For INSERT simply get the new value
*/
new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
}
/*
* Handle NULL attributes
*/
if (toast_isnull[i])
{
toast_action[i] = 'p';
has_nulls = true;
continue;
}
/*
* Now look at varlena attributes
*/
if (att->attlen == -1)
{
/*
* If the table's attribute says PLAIN always, force it so.
*/
if (att->attstorage == 'p')
toast_action[i] = 'p';
/*
* We took care of UPDATE above, so any external value we find
* still in the tuple must be someone else's that we cannot reuse
* (this includes the case of an out-of-line in-memory datum).
* Fetch it back (without decompression, unless we are forcing
* PLAIN storage). If necessary, we'll push it out as a new
* external value below.
*/
if (VARATT_IS_EXTERNAL(new_value))
{
toast_oldexternal[i] = new_value;
if (att->attstorage == 'p')
new_value = heap_tuple_untoast_attr(new_value);
else
new_value = heap_tuple_fetch_attr(new_value);
toast_values[i] = PointerGetDatum(new_value);
toast_free[i] = true;
need_change = true;
need_free = true;
}
/*
* Remember the size of this attribute
*/
toast_sizes[i] = VARSIZE_ANY(new_value);
}
else
{
/*
* Not a varlena attribute, plain storage always
*/
toast_action[i] = 'p';
}
ttc.ttc_oldvalues = NULL;
ttc.ttc_oldisnull = NULL;
}
else
{
ttc.ttc_oldvalues = toast_oldvalues;
ttc.ttc_oldisnull = toast_oldisnull;
}
ttc.ttc_attr = toast_attr;
toast_tuple_init(&ttc);
/* ----------
* Compress and/or save external until data fits into target length
@ -297,7 +167,7 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
/* compute header overhead --- this should match heap_form_tuple() */
hoff = SizeofHeapTupleHeader;
if (has_nulls)
if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0)
hoff += BITMAPLEN(numAttrs);
hoff = MAXALIGN(hoff);
/* now convert to a limit on the tuple data size */
@ -310,66 +180,21 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
while (heap_compute_data_size(tupleDesc,
toast_values, toast_isnull) > maxDataLen)
{
int biggest_attno = -1;
int32 biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
Datum old_value;
Datum new_value;
/*
* Search for the biggest yet unprocessed internal attribute
*/
for (i = 0; i < numAttrs; i++)
{
Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
if (toast_action[i] != ' ')
continue;
if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
continue; /* can't happen, toast_action would be 'p' */
if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
continue;
if (att->attstorage != 'x' && att->attstorage != 'e')
continue;
if (toast_sizes[i] > biggest_size)
{
biggest_attno = i;
biggest_size = toast_sizes[i];
}
}
int biggest_attno;
biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, false);
if (biggest_attno < 0)
break;
/*
* Attempt to compress it inline, if it has attstorage 'x'
*/
i = biggest_attno;
if (TupleDescAttr(tupleDesc, i)->attstorage == 'x')
{
old_value = toast_values[i];
new_value = toast_compress_datum(old_value);
if (DatumGetPointer(new_value) != NULL)
{
/* successful compression */
if (toast_free[i])
pfree(DatumGetPointer(old_value));
toast_values[i] = new_value;
toast_free[i] = true;
toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
need_change = true;
need_free = true;
}
else
{
/* incompressible, ignore on subsequent compression passes */
toast_action[i] = 'x';
}
}
if (TupleDescAttr(tupleDesc, biggest_attno)->attstorage == 'x')
toast_tuple_try_compression(&ttc, biggest_attno);
else
{
/* has attstorage 'e', ignore on subsequent compression passes */
toast_action[i] = 'x';
toast_attr[biggest_attno].tai_colflags |= TOASTCOL_INCOMPRESSIBLE;
}
/*
@ -380,72 +205,26 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
*
* XXX maybe the threshold should be less than maxDataLen?
*/
if (toast_sizes[i] > maxDataLen &&
if (toast_attr[biggest_attno].tai_size > maxDataLen &&
rel->rd_rel->reltoastrelid != InvalidOid)
{
old_value = toast_values[i];
toast_action[i] = 'p';
toast_values[i] = toast_save_datum(rel, toast_values[i],
toast_oldexternal[i], options);
if (toast_free[i])
pfree(DatumGetPointer(old_value));
toast_free[i] = true;
need_change = true;
need_free = true;
}
toast_tuple_externalize(&ttc, biggest_attno, options);
}
/*
* Second we look for attributes of attstorage 'x' or 'e' that are still
* inline. But skip this if there's no toast table to push them to.
* inline, and make them external. But skip this if there's no toast
* table to push them to.
*/
while (heap_compute_data_size(tupleDesc,
toast_values, toast_isnull) > maxDataLen &&
rel->rd_rel->reltoastrelid != InvalidOid)
{
int biggest_attno = -1;
int32 biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
Datum old_value;
/*------
* Search for the biggest yet inlined attribute with
* attstorage equals 'x' or 'e'
*------
*/
for (i = 0; i < numAttrs; i++)
{
Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
if (toast_action[i] == 'p')
continue;
if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
continue; /* can't happen, toast_action would be 'p' */
if (att->attstorage != 'x' && att->attstorage != 'e')
continue;
if (toast_sizes[i] > biggest_size)
{
biggest_attno = i;
biggest_size = toast_sizes[i];
}
}
int biggest_attno;
biggest_attno = toast_tuple_find_biggest_attribute(&ttc, false, false);
if (biggest_attno < 0)
break;
/*
* Store this external
*/
i = biggest_attno;
old_value = toast_values[i];
toast_action[i] = 'p';
toast_values[i] = toast_save_datum(rel, toast_values[i],
toast_oldexternal[i], options);
if (toast_free[i])
pfree(DatumGetPointer(old_value));
toast_free[i] = true;
need_change = true;
need_free = true;
toast_tuple_externalize(&ttc, biggest_attno, options);
}
/*
@ -455,57 +234,13 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
while (heap_compute_data_size(tupleDesc,
toast_values, toast_isnull) > maxDataLen)
{
int biggest_attno = -1;
int32 biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
Datum old_value;
Datum new_value;
/*
* Search for the biggest yet uncompressed internal attribute
*/
for (i = 0; i < numAttrs; i++)
{
if (toast_action[i] != ' ')
continue;
if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
continue; /* can't happen, toast_action would be 'p' */
if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
continue;
if (TupleDescAttr(tupleDesc, i)->attstorage != 'm')
continue;
if (toast_sizes[i] > biggest_size)
{
biggest_attno = i;
biggest_size = toast_sizes[i];
}
}
int biggest_attno;
biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, true);
if (biggest_attno < 0)
break;
/*
* Attempt to compress it inline
*/
i = biggest_attno;
old_value = toast_values[i];
new_value = toast_compress_datum(old_value);
if (DatumGetPointer(new_value) != NULL)
{
/* successful compression */
if (toast_free[i])
pfree(DatumGetPointer(old_value));
toast_values[i] = new_value;
toast_free[i] = true;
toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
need_change = true;
need_free = true;
}
else
{
/* incompressible, ignore on subsequent compression passes */
toast_action[i] = 'x';
}
toast_tuple_try_compression(&ttc, biggest_attno);
}
/*
@ -519,54 +254,20 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
toast_values, toast_isnull) > maxDataLen &&
rel->rd_rel->reltoastrelid != InvalidOid)
{
int biggest_attno = -1;
int32 biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
Datum old_value;
/*--------
* Search for the biggest yet inlined attribute with
* attstorage = 'm'
*--------
*/
for (i = 0; i < numAttrs; i++)
{
if (toast_action[i] == 'p')
continue;
if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
continue; /* can't happen, toast_action would be 'p' */
if (TupleDescAttr(tupleDesc, i)->attstorage != 'm')
continue;
if (toast_sizes[i] > biggest_size)
{
biggest_attno = i;
biggest_size = toast_sizes[i];
}
}
int biggest_attno;
biggest_attno = toast_tuple_find_biggest_attribute(&ttc, false, true);
if (biggest_attno < 0)
break;
/*
* Store this external
*/
i = biggest_attno;
old_value = toast_values[i];
toast_action[i] = 'p';
toast_values[i] = toast_save_datum(rel, toast_values[i],
toast_oldexternal[i], options);
if (toast_free[i])
pfree(DatumGetPointer(old_value));
toast_free[i] = true;
need_change = true;
need_free = true;
toast_tuple_externalize(&ttc, biggest_attno, options);
}
/*
* In the case we toasted any values, we need to build a new heap tuple
* with the changed values.
*/
if (need_change)
if ((ttc.ttc_flags & TOAST_NEEDS_CHANGE) != 0)
{
HeapTupleHeader olddata = newtup->t_data;
HeapTupleHeader new_data;
@ -585,7 +286,7 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
* whether there needs to be one at all.
*/
new_header_len = SizeofHeapTupleHeader;
if (has_nulls)
if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0)
new_header_len += BITMAPLEN(numAttrs);
new_header_len = MAXALIGN(new_header_len);
new_data_len = heap_compute_data_size(tupleDesc,
@ -616,26 +317,13 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
(char *) new_data + new_header_len,
new_data_len,
&(new_data->t_infomask),
has_nulls ? new_data->t_bits : NULL);
((ttc.ttc_flags & TOAST_HAS_NULLS) != 0) ?
new_data->t_bits : NULL);
}
else
result_tuple = newtup;
/*
* Free allocated temp values
*/
if (need_free)
for (i = 0; i < numAttrs; i++)
if (toast_free[i])
pfree(DatumGetPointer(toast_values[i]));
/*
* Delete external values from the old tuple
*/
if (need_delold)
for (i = 0; i < numAttrs; i++)
if (toast_delold[i])
toast_delete_datum(rel, toast_oldvalues[i], false);
toast_tuple_cleanup(&ttc);
return result_tuple;
}