mirror of
https://github.com/postgres/postgres.git
synced 2025-07-05 07:21:24 +03:00
Allow backends to start up without use of the flat-file copy of pg_database.
To make this work in the base case, pg_database now has a nailed-in-cache relation descriptor that is initialized using hardwired knowledge in relcache.c. This means pg_database is added to the set of relations that need to have a Schema_pg_xxx macro maintained in pg_attribute.h. When this path is taken, we'll have to do a seqscan of pg_database to find the row we need. In the normal case, we are able to do an indexscan to find the database's row by name. This is made possible by storing a global relcache init file that describes only the shared catalogs and their indexes (and therefore is usable by all backends in any database). A new backend loads this cache file, finds its database OID after an indexscan on pg_database, and then loads the local relcache init file for that database. This change should effectively eliminate number of databases as a factor in backend startup time, even with large numbers of databases. However, the real reason for doing it is as a first step towards getting rid of the flat files altogether. There are still several other sub-projects to be tackled before that can happen.
This commit is contained in:
@ -8,7 +8,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.193 2009/07/31 20:26:23 tgl Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.194 2009/08/12 20:53:30 tgl Exp $
|
||||
*
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
@ -19,20 +19,20 @@
|
||||
#include <unistd.h>
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "access/sysattr.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/catalog.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "catalog/pg_database.h"
|
||||
#include "catalog/pg_tablespace.h"
|
||||
#include "libpq/hba.h"
|
||||
#include "libpq/libpq-be.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "storage/backendid.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
@ -43,19 +43,17 @@
|
||||
#include "storage/sinvaladt.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/flatfiles.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/pg_locale.h"
|
||||
#include "utils/plancache.h"
|
||||
#include "utils/portal.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/snapmgr.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/tqual.h"
|
||||
|
||||
|
||||
static bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
|
||||
static bool FindMyDatabaseByOid(Oid dbid, char *dbname, Oid *db_tablespace);
|
||||
static HeapTuple GetDatabaseTuple(const char *dbname);
|
||||
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
|
||||
static void CheckMyDatabase(const char *name, bool am_superuser);
|
||||
static void InitCommunication(void);
|
||||
static void ShutdownPostgres(int code, Datum arg);
|
||||
@ -66,90 +64,97 @@ static bool ThereIsAtLeastOneRole(void);
|
||||
|
||||
|
||||
/*
|
||||
* FindMyDatabase -- get the critical info needed to locate my database
|
||||
* GetDatabaseTuple -- fetch the pg_database row for a database
|
||||
*
|
||||
* Find the named database in pg_database, return its database OID and the
|
||||
* OID of its default tablespace. Return TRUE if found, FALSE if not.
|
||||
*
|
||||
* Since we are not yet up and running as a backend, we cannot look directly
|
||||
* at pg_database (we can't obtain locks nor participate in transactions).
|
||||
* So to get the info we need before starting up, we must look at the "flat
|
||||
* file" copy of pg_database that is helpfully maintained by flatfiles.c.
|
||||
* This is subject to various race conditions, so after we have the
|
||||
* transaction infrastructure started, we have to recheck the information;
|
||||
* see InitPostgres.
|
||||
* This is used during backend startup when we don't yet have any access to
|
||||
* system catalogs in general. In the worst case, we can seqscan pg_database
|
||||
* using nothing but the hard-wired descriptor that relcache.c creates for
|
||||
* pg_database. In more typical cases, relcache.c was able to load
|
||||
* descriptors for both pg_database and its indexes from the shared relcache
|
||||
* cache file, and so we can do an indexscan. criticalSharedRelcachesBuilt
|
||||
* tells whether we got the cached descriptors.
|
||||
*/
|
||||
static bool
|
||||
FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace)
|
||||
static HeapTuple
|
||||
GetDatabaseTuple(const char *dbname)
|
||||
{
|
||||
bool result = false;
|
||||
char *filename;
|
||||
FILE *db_file;
|
||||
char thisname[NAMEDATALEN];
|
||||
TransactionId db_frozenxid;
|
||||
HeapTuple tuple;
|
||||
Relation relation;
|
||||
SysScanDesc scan;
|
||||
ScanKeyData key[1];
|
||||
|
||||
filename = database_getflatfilename();
|
||||
db_file = AllocateFile(filename, "r");
|
||||
if (db_file == NULL)
|
||||
ereport(FATAL,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\": %m", filename)));
|
||||
/*
|
||||
* form a scan key
|
||||
*/
|
||||
ScanKeyInit(&key[0],
|
||||
Anum_pg_database_datname,
|
||||
BTEqualStrategyNumber, F_NAMEEQ,
|
||||
CStringGetDatum(dbname));
|
||||
|
||||
while (read_pg_database_line(db_file, thisname, db_id,
|
||||
db_tablespace, &db_frozenxid))
|
||||
{
|
||||
if (strcmp(thisname, name) == 0)
|
||||
{
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Open pg_database and fetch a tuple. Force heap scan if we haven't yet
|
||||
* built the critical shared relcache entries (i.e., we're starting up
|
||||
* without a shared relcache cache file).
|
||||
*/
|
||||
relation = heap_open(DatabaseRelationId, AccessShareLock);
|
||||
scan = systable_beginscan(relation, DatabaseNameIndexId,
|
||||
criticalSharedRelcachesBuilt,
|
||||
SnapshotNow,
|
||||
1, key);
|
||||
|
||||
FreeFile(db_file);
|
||||
pfree(filename);
|
||||
tuple = systable_getnext(scan);
|
||||
|
||||
return result;
|
||||
/* Must copy tuple before releasing buffer */
|
||||
if (HeapTupleIsValid(tuple))
|
||||
tuple = heap_copytuple(tuple);
|
||||
|
||||
/* all done */
|
||||
systable_endscan(scan);
|
||||
heap_close(relation, AccessShareLock);
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
||||
/*
|
||||
* FindMyDatabaseByOid
|
||||
*
|
||||
* As above, but the actual database Id is known. Return its name and the
|
||||
* tablespace OID. Return TRUE if found, FALSE if not. The same restrictions
|
||||
* as FindMyDatabase apply.
|
||||
* GetDatabaseTupleByOid -- as above, but search by database OID
|
||||
*/
|
||||
static bool
|
||||
FindMyDatabaseByOid(Oid dbid, char *dbname, Oid *db_tablespace)
|
||||
static HeapTuple
|
||||
GetDatabaseTupleByOid(Oid dboid)
|
||||
{
|
||||
bool result = false;
|
||||
char *filename;
|
||||
FILE *db_file;
|
||||
Oid db_id;
|
||||
char thisname[NAMEDATALEN];
|
||||
TransactionId db_frozenxid;
|
||||
HeapTuple tuple;
|
||||
Relation relation;
|
||||
SysScanDesc scan;
|
||||
ScanKeyData key[1];
|
||||
|
||||
filename = database_getflatfilename();
|
||||
db_file = AllocateFile(filename, "r");
|
||||
if (db_file == NULL)
|
||||
ereport(FATAL,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\": %m", filename)));
|
||||
/*
|
||||
* form a scan key
|
||||
*/
|
||||
ScanKeyInit(&key[0],
|
||||
ObjectIdAttributeNumber,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(dboid));
|
||||
|
||||
while (read_pg_database_line(db_file, thisname, &db_id,
|
||||
db_tablespace, &db_frozenxid))
|
||||
{
|
||||
if (dbid == db_id)
|
||||
{
|
||||
result = true;
|
||||
strlcpy(dbname, thisname, NAMEDATALEN);
|
||||
break;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Open pg_database and fetch a tuple. Force heap scan if we haven't yet
|
||||
* built the critical shared relcache entries (i.e., we're starting up
|
||||
* without a shared relcache cache file).
|
||||
*/
|
||||
relation = heap_open(DatabaseRelationId, AccessShareLock);
|
||||
scan = systable_beginscan(relation, DatabaseOidIndexId,
|
||||
criticalSharedRelcachesBuilt,
|
||||
SnapshotNow,
|
||||
1, key);
|
||||
|
||||
FreeFile(db_file);
|
||||
pfree(filename);
|
||||
tuple = systable_getnext(scan);
|
||||
|
||||
return result;
|
||||
/* Must copy tuple before releasing buffer */
|
||||
if (HeapTupleIsValid(tuple))
|
||||
tuple = heap_copytuple(tuple);
|
||||
|
||||
/* all done */
|
||||
systable_endscan(scan);
|
||||
heap_close(relation, AccessShareLock);
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
||||
|
||||
@ -164,7 +169,7 @@ CheckMyDatabase(const char *name, bool am_superuser)
|
||||
char *collate;
|
||||
char *ctype;
|
||||
|
||||
/* Fetch our real pg_database row */
|
||||
/* Fetch our pg_database row normally, via syscache */
|
||||
tup = SearchSysCache(DATABASEOID,
|
||||
ObjectIdGetDatum(MyDatabaseId),
|
||||
0, 0, 0);
|
||||
@ -356,8 +361,9 @@ BaseInit(void)
|
||||
* Initialize POSTGRES.
|
||||
*
|
||||
* The database can be specified by name, using the in_dbname parameter, or by
|
||||
* OID, using the dboid parameter. In the latter case, the computed database
|
||||
* name is passed out to the caller as a palloc'ed string in out_dbname.
|
||||
* OID, using the dboid parameter. In the latter case, the actual database
|
||||
* name can be returned to the caller in out_dbname. If out_dbname isn't
|
||||
* NULL, it must point to a buffer of size NAMEDATALEN.
|
||||
*
|
||||
* In bootstrap mode no parameters are used.
|
||||
*
|
||||
@ -366,7 +372,7 @@ BaseInit(void)
|
||||
* the startup transaction rather than doing a separate one in postgres.c.)
|
||||
*
|
||||
* As of PostgreSQL 8.2, we expect InitProcess() was already called, so we
|
||||
* already have a PGPROC struct ... but it's not filled in yet.
|
||||
* already have a PGPROC struct ... but it's not completely filled in yet.
|
||||
*
|
||||
* Note:
|
||||
* Be very careful with the order of calls in the InitPostgres function.
|
||||
@ -374,7 +380,7 @@ BaseInit(void)
|
||||
*/
|
||||
bool
|
||||
InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
char **out_dbname)
|
||||
char *out_dbname)
|
||||
{
|
||||
bool bootstrap = IsBootstrapProcessingMode();
|
||||
bool autovacuum = IsAutoVacuumWorkerProcess();
|
||||
@ -383,57 +389,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
char dbname[NAMEDATALEN];
|
||||
|
||||
/*
|
||||
* Set up the global variables holding database id and path. But note we
|
||||
* won't actually try to touch the database just yet.
|
||||
*
|
||||
* We take a shortcut in the bootstrap case, otherwise we have to look up
|
||||
* the db name in pg_database.
|
||||
*/
|
||||
if (bootstrap)
|
||||
{
|
||||
MyDatabaseId = TemplateDbOid;
|
||||
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Find tablespace of the database we're about to open. Since we're
|
||||
* not yet up and running we have to use one of the hackish
|
||||
* FindMyDatabase variants, which look in the flat-file copy of
|
||||
* pg_database.
|
||||
*
|
||||
* If the in_dbname param is NULL, lookup database by OID.
|
||||
*/
|
||||
if (in_dbname == NULL)
|
||||
{
|
||||
if (!FindMyDatabaseByOid(dboid, dbname, &MyDatabaseTableSpace))
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_UNDEFINED_DATABASE),
|
||||
errmsg("database %u does not exist", dboid)));
|
||||
MyDatabaseId = dboid;
|
||||
/* pass the database name to the caller */
|
||||
*out_dbname = pstrdup(dbname);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!FindMyDatabase(in_dbname, &MyDatabaseId, &MyDatabaseTableSpace))
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_UNDEFINED_DATABASE),
|
||||
errmsg("database \"%s\" does not exist",
|
||||
in_dbname)));
|
||||
/* our database name is gotten from the caller */
|
||||
strlcpy(dbname, in_dbname, NAMEDATALEN);
|
||||
}
|
||||
}
|
||||
|
||||
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
|
||||
|
||||
SetDatabasePath(fullpath);
|
||||
|
||||
/*
|
||||
* Finish filling in the PGPROC struct, and add it to the ProcArray. (We
|
||||
* need to know MyDatabaseId before we can do this, since it's entered
|
||||
* into the PGPROC struct.)
|
||||
* Add my PGPROC struct to the ProcArray.
|
||||
*
|
||||
* Once I have done this, I am visible to other backends!
|
||||
*/
|
||||
@ -507,10 +463,69 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that we have a transaction, we can take locks. Take a writer's
|
||||
* lock on the database we are trying to connect to. If there is a
|
||||
* concurrently running DROP DATABASE on that database, this will block us
|
||||
* until it finishes (and has updated the flat file copy of pg_database).
|
||||
* Load relcache entries for the shared system catalogs. This must
|
||||
* create at least an entry for pg_database.
|
||||
*/
|
||||
RelationCacheInitializePhase2();
|
||||
|
||||
/*
|
||||
* Set up the global variables holding database id and default tablespace.
|
||||
* But note we won't actually try to touch the database just yet.
|
||||
*
|
||||
* We take a shortcut in the bootstrap case, otherwise we have to look up
|
||||
* the db's entry in pg_database.
|
||||
*/
|
||||
if (bootstrap)
|
||||
{
|
||||
MyDatabaseId = TemplateDbOid;
|
||||
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
|
||||
}
|
||||
else if (in_dbname != NULL)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
Form_pg_database dbform;
|
||||
|
||||
tuple = GetDatabaseTuple(in_dbname);
|
||||
if (!HeapTupleIsValid(tuple))
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_UNDEFINED_DATABASE),
|
||||
errmsg("database \"%s\" does not exist", in_dbname)));
|
||||
dbform = (Form_pg_database) GETSTRUCT(tuple);
|
||||
MyDatabaseId = HeapTupleGetOid(tuple);
|
||||
MyDatabaseTableSpace = dbform->dattablespace;
|
||||
/* take database name from the caller, just for paranoia */
|
||||
strlcpy(dbname, in_dbname, sizeof(dbname));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* caller specified database by OID */
|
||||
HeapTuple tuple;
|
||||
Form_pg_database dbform;
|
||||
|
||||
tuple = GetDatabaseTupleByOid(dboid);
|
||||
if (!HeapTupleIsValid(tuple))
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_UNDEFINED_DATABASE),
|
||||
errmsg("database %u does not exist", dboid)));
|
||||
dbform = (Form_pg_database) GETSTRUCT(tuple);
|
||||
MyDatabaseId = HeapTupleGetOid(tuple);
|
||||
MyDatabaseTableSpace = dbform->dattablespace;
|
||||
Assert(MyDatabaseId == dboid);
|
||||
strlcpy(dbname, NameStr(dbform->datname), sizeof(dbname));
|
||||
/* pass the database name back to the caller */
|
||||
if (out_dbname)
|
||||
strcpy(out_dbname, dbname);
|
||||
}
|
||||
|
||||
/* Now we can mark our PGPROC entry with the database ID */
|
||||
/* (We assume this is an atomic store so no lock is needed) */
|
||||
MyProc->databaseId = MyDatabaseId;
|
||||
|
||||
/*
|
||||
* Now, take a writer's lock on the database we are trying to connect to.
|
||||
* If there is a concurrently running DROP DATABASE on that database,
|
||||
* this will block us until it finishes (and has committed its update of
|
||||
* pg_database).
|
||||
*
|
||||
* Note that the lock is not held long, only until the end of this startup
|
||||
* transaction. This is OK since we are already advertising our use of
|
||||
@ -528,21 +543,21 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
RowExclusiveLock);
|
||||
|
||||
/*
|
||||
* Recheck the flat file copy of pg_database to make sure the target
|
||||
* database hasn't gone away. If there was a concurrent DROP DATABASE,
|
||||
* this ensures we will die cleanly without creating a mess.
|
||||
* Recheck pg_database to make sure the target database hasn't gone away.
|
||||
* If there was a concurrent DROP DATABASE, this ensures we will die
|
||||
* cleanly without creating a mess.
|
||||
*/
|
||||
if (!bootstrap)
|
||||
{
|
||||
Oid dbid2;
|
||||
Oid tsid2;
|
||||
HeapTuple tuple;
|
||||
|
||||
if (!FindMyDatabase(dbname, &dbid2, &tsid2) ||
|
||||
dbid2 != MyDatabaseId || tsid2 != MyDatabaseTableSpace)
|
||||
tuple = GetDatabaseTuple(dbname);
|
||||
if (!HeapTupleIsValid(tuple) ||
|
||||
MyDatabaseId != HeapTupleGetOid(tuple) ||
|
||||
MyDatabaseTableSpace != ((Form_pg_database) GETSTRUCT(tuple))->dattablespace)
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_UNDEFINED_DATABASE),
|
||||
errmsg("database \"%s\" does not exist",
|
||||
dbname),
|
||||
errmsg("database \"%s\" does not exist", dbname),
|
||||
errdetail("It seems to have just been dropped or renamed.")));
|
||||
}
|
||||
|
||||
@ -550,6 +565,8 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
* Now we should be able to access the database directory safely. Verify
|
||||
* it's there and looks reasonable.
|
||||
*/
|
||||
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
|
||||
|
||||
if (!bootstrap)
|
||||
{
|
||||
if (access(fullpath, F_OK) == -1)
|
||||
@ -571,13 +588,15 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
ValidatePgVersion(fullpath);
|
||||
}
|
||||
|
||||
SetDatabasePath(fullpath);
|
||||
|
||||
/*
|
||||
* It's now possible to do real access to the system catalogs.
|
||||
*
|
||||
* Load relcache entries for the system catalogs. This must create at
|
||||
* least the minimum set of "nailed-in" cache entries.
|
||||
*/
|
||||
RelationCacheInitializePhase2();
|
||||
RelationCacheInitializePhase3();
|
||||
|
||||
/*
|
||||
* Figure out our postgres user id, and see if we are a superuser.
|
||||
@ -612,7 +631,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
||||
initialize_acl();
|
||||
|
||||
/*
|
||||
* Read the real pg_database row for our database, check permissions and
|
||||
* Re-read the pg_database row for our database, check permissions and
|
||||
* set up database-specific GUC settings. We can't do this until all the
|
||||
* database-access infrastructure is up. (Also, it wants to know if the
|
||||
* user is a superuser, so the above stuff has to happen first.)
|
||||
|
Reference in New Issue
Block a user