1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-18 17:42:25 +03:00

Allow configurable LZ4 TOAST compression.

There is now a per-column COMPRESSION option which can be set to pglz
(the default, and the only option in up until now) or lz4. Or, if you
like, you can set the new default_toast_compression GUC to lz4, and
then that will be the default for new table columns for which no value
is specified. We don't have lz4 support in the PostgreSQL code, so
to use lz4 compression, PostgreSQL must be built --with-lz4.

In general, TOAST compression means compression of individual column
values, not the whole tuple, and those values can either be compressed
inline within the tuple or compressed and then stored externally in
the TOAST table, so those properties also apply to this feature.

Prior to this commit, a TOAST pointer has two unused bits as part of
the va_extsize field, and a compessed datum has two unused bits as
part of the va_rawsize field. These bits are unused because the length
of a varlena is limited to 1GB; we now use them to indicate the
compression type that was used. This means we only have bit space for
2 more built-in compresison types, but we could work around that
problem, if necessary, by introducing a new vartag_external value for
any further types we end up wanting to add. Hopefully, it won't be
too important to offer a wide selection of algorithms here, since
each one we add not only takes more coding but also adds a build
dependency for every packager. Nevertheless, it seems worth doing
at least this much, because LZ4 gets better compression than PGLZ
with less CPU usage.

It's possible for LZ4-compressed datums to leak into composite type
values stored on disk, just as it is for PGLZ. It's also possible for
LZ4-compressed attributes to be copied into a different table via SQL
commands such as CREATE TABLE AS or INSERT .. SELECT.  It would be
expensive to force such values to be decompressed, so PostgreSQL has
never done so. For the same reasons, we also don't force recompression
of already-compressed values even if the target table prefers a
different compression method than was used for the source data.  These
architectural decisions are perhaps arguable but revisiting them is
well beyond the scope of what seemed possible to do as part of this
project.  However, it's relatively cheap to recompress as part of
VACUUM FULL or CLUSTER, so this commit adjusts those commands to do
so, if the configured compression method of the table happens not to
match what was used for some column value stored therein.

Dilip Kumar. The original patches on which this work was based were
written by Ildus Kurbangaliev, and those were patches were based on
even earlier work by Nikita Glukhov, but the design has since changed
very substantially, since allow a potentially large number of
compression methods that could be added and dropped on a running
system proved too problematic given some of the architectural issues
mentioned above; the choice of which specific compression method to
add first is now different; and a lot of the code has been heavily
refactored.  More recently, Justin Przyby helped quite a bit with
testing and reviewing and this version also includes some code
contributions from him. Other design input and review from Tomas
Vondra, Álvaro Herrera, Andres Freund, Oleg Bartunov, Alexander
Korotkov, and me.

Discussion: http://postgr.es/m/20170907194236.4cefce96%40wp.localdomain
Discussion: http://postgr.es/m/CAFiTN-uUpX3ck%3DK0mLEk-G_kUQY%3DSNOTeqdaNRR9FMdQrHKebw%40mail.gmail.com
This commit is contained in:
Robert Haas
2021-03-19 15:10:38 -04:00
parent e589c4890b
commit bbe0a81db6
61 changed files with 2261 additions and 160 deletions

View File

@ -23,6 +23,7 @@
#include "access/relscan.h"
#include "access/sysattr.h"
#include "access/tableam.h"
#include "access/toast_compression.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
@ -527,6 +528,8 @@ static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKM
static void ATExecGenericOptions(Relation rel, List *options);
static void ATExecSetRowSecurity(Relation rel, bool rls);
static void ATExecForceNoForceRowSecurity(Relation rel, bool force_rls);
static ObjectAddress ATExecSetCompression(AlteredTableInfo *tab, Relation rel,
const char *column, Node *newValue, LOCKMODE lockmode);
static void index_copy_data(Relation rel, RelFileNode newrnode);
static const char *storage_name(char c);
@ -558,6 +561,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
static List *GetParentedForeignKeyRefs(Relation partition);
static void ATDetachCheckNoForeignKeyRefs(Relation partition);
static void ATExecAlterCollationRefreshVersion(Relation rel, List *coll);
static char GetAttributeCompression(Form_pg_attribute att, char *compression);
/* ----------------------------------------------------------------
@ -852,6 +856,18 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
if (colDef->generated)
attr->attgenerated = colDef->generated;
/*
* lookup attribute's compression method and store it in the
* attr->attcompression.
*/
if (relkind == RELKIND_RELATION ||
relkind == RELKIND_PARTITIONED_TABLE ||
relkind == RELKIND_MATVIEW)
attr->attcompression =
GetAttributeCompression(attr, colDef->compression);
else
attr->attcompression = InvalidCompressionMethod;
}
/*
@ -2396,6 +2412,22 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
storage_name(def->storage),
storage_name(attribute->attstorage))));
/* Copy/check compression parameter */
if (CompressionMethodIsValid(attribute->attcompression))
{
const char *compression =
GetCompressionMethodName(attribute->attcompression);
if (def->compression == NULL)
def->compression = pstrdup(compression);
else if (strcmp(def->compression, compression) != 0)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" has a compression method conflict",
attributeName),
errdetail("%s versus %s", def->compression, compression)));
}
def->inhcount++;
/* Merge of NOT NULL constraints = OR 'em together */
def->is_not_null |= attribute->attnotnull;
@ -2430,6 +2462,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
def->collOid = attribute->attcollation;
def->constraints = NIL;
def->location = -1;
if (CompressionMethodIsValid(attribute->attcompression))
def->compression = pstrdup(GetCompressionMethodName(
attribute->attcompression));
else
def->compression = NULL;
inhSchema = lappend(inhSchema, def);
newattmap->attnums[parent_attno - 1] = ++child_attno;
}
@ -2675,6 +2712,19 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
storage_name(def->storage),
storage_name(newdef->storage))));
/* Copy compression parameter */
if (def->compression == NULL)
def->compression = newdef->compression;
else if (newdef->compression != NULL)
{
if (strcmp(def->compression, newdef->compression) != 0)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" has a compression method conflict",
attributeName),
errdetail("%s versus %s", def->compression, newdef->compression)));
}
/* Mark the column as locally defined */
def->is_local = true;
/* Merge of NOT NULL constraints = OR 'em together */
@ -3961,6 +4011,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_DropIdentity:
case AT_SetIdentity:
case AT_DropExpression:
case AT_SetCompression:
cmd_lockmode = AccessExclusiveLock;
break;
@ -4283,6 +4334,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_SetCompression: /* ALTER COLUMN SET COMPRESSION */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_DropColumn: /* DROP COLUMN */
ATSimplePermissions(rel,
ATT_TABLE | ATT_COMPOSITE_TYPE | ATT_FOREIGN_TABLE);
@ -4626,6 +4683,10 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
address = ATExecSetStorage(rel, cmd->name, cmd->def, lockmode);
break;
case AT_SetCompression:
address = ATExecSetCompression(tab, rel, cmd->name, cmd->def,
lockmode);
break;
case AT_DropColumn: /* DROP COLUMN */
address = ATExecDropColumn(wqueue, rel, cmd->name,
cmd->behavior, false, false,
@ -6340,6 +6401,18 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
attribute.attislocal = colDef->is_local;
attribute.attinhcount = colDef->inhcount;
attribute.attcollation = collOid;
/*
* lookup attribute's compression method and store it in the
* attr->attcompression.
*/
if (rel->rd_rel->relkind == RELKIND_RELATION ||
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
attribute.attcompression = GetAttributeCompression(&attribute,
colDef->compression);
else
attribute.attcompression = InvalidCompressionMethod;
/* attribute.attacl is handled by InsertPgAttributeTuples() */
ReleaseSysCache(typeTuple);
@ -7712,6 +7785,68 @@ ATExecSetOptions(Relation rel, const char *colName, Node *options,
return address;
}
/*
* Helper function for ATExecSetStorage and ATExecSetCompression
*
* Set the attcompression and/or attstorage for the respective index attribute
* if the respective input values are valid.
*/
static void
SetIndexStorageProperties(Relation rel, Relation attrelation,
AttrNumber attnum, char newcompression,
char newstorage, LOCKMODE lockmode)
{
HeapTuple tuple;
ListCell *lc;
Form_pg_attribute attrtuple;
foreach(lc, RelationGetIndexList(rel))
{
Oid indexoid = lfirst_oid(lc);
Relation indrel;
AttrNumber indattnum = 0;
indrel = index_open(indexoid, lockmode);
for (int i = 0; i < indrel->rd_index->indnatts; i++)
{
if (indrel->rd_index->indkey.values[i] == attnum)
{
indattnum = i + 1;
break;
}
}
if (indattnum == 0)
{
index_close(indrel, lockmode);
continue;
}
tuple = SearchSysCacheCopyAttNum(RelationGetRelid(indrel), indattnum);
if (HeapTupleIsValid(tuple))
{
attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
if (CompressionMethodIsValid(newcompression))
attrtuple->attcompression = newcompression;
if (newstorage != '\0')
attrtuple->attstorage = newstorage;
CatalogTupleUpdate(attrelation, &tuple->t_self, tuple);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel),
attrtuple->attnum);
heap_freetuple(tuple);
}
index_close(indrel, lockmode);
}
}
/*
* ALTER TABLE ALTER COLUMN SET STORAGE
*
@ -7727,7 +7862,6 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc
Form_pg_attribute attrtuple;
AttrNumber attnum;
ObjectAddress address;
ListCell *lc;
Assert(IsA(newValue, String));
storagemode = strVal(newValue);
@ -7791,47 +7925,9 @@ ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE loc
* Apply the change to indexes as well (only for simple index columns,
* matching behavior of index.c ConstructTupleDescriptor()).
*/
foreach(lc, RelationGetIndexList(rel))
{
Oid indexoid = lfirst_oid(lc);
Relation indrel;
AttrNumber indattnum = 0;
indrel = index_open(indexoid, lockmode);
for (int i = 0; i < indrel->rd_index->indnatts; i++)
{
if (indrel->rd_index->indkey.values[i] == attnum)
{
indattnum = i + 1;
break;
}
}
if (indattnum == 0)
{
index_close(indrel, lockmode);
continue;
}
tuple = SearchSysCacheCopyAttNum(RelationGetRelid(indrel), indattnum);
if (HeapTupleIsValid(tuple))
{
attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
attrtuple->attstorage = newstorage;
CatalogTupleUpdate(attrelation, &tuple->t_self, tuple);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel),
attrtuple->attnum);
heap_freetuple(tuple);
}
index_close(indrel, lockmode);
}
SetIndexStorageProperties(rel, attrelation, attnum,
InvalidCompressionMethod,
newstorage, lockmode);
table_close(attrelation, RowExclusiveLock);
@ -11859,6 +11955,23 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
ReleaseSysCache(typeTuple);
/* Setup attribute compression */
if (rel->rd_rel->relkind == RELKIND_RELATION ||
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
/*
* No compression for plain/external storage, otherwise, default
* compression method if it is not already set, refer comments atop
* attcompression parameter in pg_attribute.h.
*/
if (!IsStorageCompressible(tform->typstorage))
attTup->attcompression = InvalidCompressionMethod;
else if (!CompressionMethodIsValid(attTup->attcompression))
attTup->attcompression = GetDefaultToastCompression();
}
else
attTup->attcompression = InvalidCompressionMethod;
CatalogTupleUpdate(attrelation, &heapTup->t_self, heapTup);
table_close(attrelation, RowExclusiveLock);
@ -14939,6 +15052,89 @@ ATExecGenericOptions(Relation rel, List *options)
heap_freetuple(tuple);
}
/*
* ALTER TABLE ALTER COLUMN SET COMPRESSION
*
* Return value is the address of the modified column
*/
static ObjectAddress
ATExecSetCompression(AlteredTableInfo *tab,
Relation rel,
const char *column,
Node *newValue,
LOCKMODE lockmode)
{
Relation attrel;
HeapTuple tuple;
Form_pg_attribute atttableform;
AttrNumber attnum;
char *compression;
char typstorage;
Oid cmoid;
Datum values[Natts_pg_attribute];
bool nulls[Natts_pg_attribute];
bool replace[Natts_pg_attribute];
ObjectAddress address;
Assert(IsA(newValue, String));
compression = strVal(newValue);
attrel = table_open(AttributeRelationId, RowExclusiveLock);
tuple = SearchSysCacheAttName(RelationGetRelid(rel), column);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
column, RelationGetRelationName(rel))));
/* prevent them from altering a system attribute */
atttableform = (Form_pg_attribute) GETSTRUCT(tuple);
attnum = atttableform->attnum;
if (attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter system column \"%s\"", column)));
typstorage = get_typstorage(atttableform->atttypid);
/* prevent from setting compression methods for uncompressible type */
if (!IsStorageCompressible(typstorage))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("column data type %s does not support compression",
format_type_be(atttableform->atttypid))));
/* initialize buffers for new tuple values */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(replace, false, sizeof(replace));
/* get the attribute compression method. */
cmoid = GetAttributeCompression(atttableform, compression);
atttableform->attcompression = cmoid;
CatalogTupleUpdate(attrel, &tuple->t_self, tuple);
InvokeObjectPostAlterHook(RelationRelationId,
RelationGetRelid(rel),
atttableform->attnum);
ReleaseSysCache(tuple);
/* apply changes to the index column as well */
SetIndexStorageProperties(rel, attrel, attnum, cmoid, '\0', lockmode);
table_close(attrel, RowExclusiveLock);
/* make changes visible */
CommandCounterIncrement();
ObjectAddressSubSet(address, RelationRelationId,
RelationGetRelid(rel), atttableform->attnum);
return address;
}
/*
* Preparation phase for SET LOGGED/UNLOGGED
*
@ -17641,3 +17837,36 @@ ATExecAlterCollationRefreshVersion(Relation rel, List *coll)
index_update_collation_versions(rel->rd_id, get_collation_oid(coll, false));
CacheInvalidateRelcache(rel);
}
/*
* resolve column compression specification to compression method.
*/
static char
GetAttributeCompression(Form_pg_attribute att, char *compression)
{
char typstorage = get_typstorage(att->atttypid);
char cmethod;
/*
* No compression for plain/external storage, refer comments atop
* attcompression parameter in pg_attribute.h
*/
if (!IsStorageCompressible(typstorage))
{
if (compression == NULL)
return InvalidCompressionMethod;
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("column data type %s does not support compression",
format_type_be(att->atttypid))));
}
/* fallback to default compression if it's not specified */
if (compression == NULL)
cmethod = GetDefaultToastCompression();
else
cmethod = CompressionNameToMethod(compression);
return cmethod;
}