mirror of
https://github.com/postgres/postgres.git
synced 2025-05-02 11:44:50 +03:00
pg_dump: Remove global connection pointer.
Parallel pg_dump wants to have multiple ArchiveHandle objects, and therefore multiple PGconns, in play at the same time. This should be just about the end of the refactoring that we need in order to make that workable.
This commit is contained in:
parent
549e93c990
commit
689d0eb7db
@ -159,13 +159,14 @@ typedef struct _restoreOptions
|
|||||||
* Main archiver interface.
|
* Main archiver interface.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
extern PGconn *ConnectDatabase(Archive *AH,
|
extern void ConnectDatabase(Archive *AH,
|
||||||
const char *dbname,
|
const char *dbname,
|
||||||
const char *pghost,
|
const char *pghost,
|
||||||
const char *pgport,
|
const char *pgport,
|
||||||
const char *username,
|
const char *username,
|
||||||
enum trivalue prompt_password);
|
enum trivalue prompt_password);
|
||||||
extern void DisconnectDatabase(Archive *AHX);
|
extern void DisconnectDatabase(Archive *AHX);
|
||||||
|
extern PGconn *GetConnection(Archive *AHX);
|
||||||
|
|
||||||
/* Called to add a TOC entry */
|
/* Called to add a TOC entry */
|
||||||
extern void ArchiveEntry(Archive *AHX,
|
extern void ArchiveEntry(Archive *AHX,
|
||||||
|
@ -225,7 +225,7 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
|
|||||||
* cache if the username keeps changing. In current usage, however, the
|
* cache if the username keeps changing. In current usage, however, the
|
||||||
* username never does change, so one savedPassword is sufficient.
|
* username never does change, so one savedPassword is sufficient.
|
||||||
*/
|
*/
|
||||||
PGconn *
|
void
|
||||||
ConnectDatabase(Archive *AHX,
|
ConnectDatabase(Archive *AHX,
|
||||||
const char *dbname,
|
const char *dbname,
|
||||||
const char *pghost,
|
const char *pghost,
|
||||||
@ -306,8 +306,6 @@ ConnectDatabase(Archive *AHX,
|
|||||||
_check_database_version(AH);
|
_check_database_version(AH);
|
||||||
|
|
||||||
PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
|
PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
|
||||||
|
|
||||||
return AH->connection;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -319,6 +317,13 @@ DisconnectDatabase(Archive *AHX)
|
|||||||
AH->connection = NULL;
|
AH->connection = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PGconn *
|
||||||
|
GetConnection(Archive *AHX)
|
||||||
|
{
|
||||||
|
ArchiveHandle *AH = (ArchiveHandle *) AHX;
|
||||||
|
|
||||||
|
return AH->connection;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
notice_processor(void *arg, const char *message)
|
notice_processor(void *arg, const char *message)
|
||||||
|
@ -86,7 +86,6 @@ typedef struct
|
|||||||
/* global decls */
|
/* global decls */
|
||||||
bool g_verbose; /* User wants verbose narration of our
|
bool g_verbose; /* User wants verbose narration of our
|
||||||
* activities. */
|
* activities. */
|
||||||
PGconn *g_conn; /* the database connection */
|
|
||||||
|
|
||||||
/* various user-settable parameters */
|
/* various user-settable parameters */
|
||||||
bool schemaOnly;
|
bool schemaOnly;
|
||||||
@ -614,9 +613,7 @@ main(int argc, char **argv)
|
|||||||
* Open the database using the Archiver, so it knows about it. Errors mean
|
* Open the database using the Archiver, so it knows about it. Errors mean
|
||||||
* death.
|
* death.
|
||||||
*/
|
*/
|
||||||
g_conn = ConnectDatabase(fout, dbname, pghost, pgport,
|
ConnectDatabase(fout, dbname, pghost, pgport, username, prompt_password);
|
||||||
username, prompt_password);
|
|
||||||
|
|
||||||
setup_connection(fout, dumpencoding, use_role);
|
setup_connection(fout, dumpencoding, use_role);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -657,7 +654,8 @@ main(int argc, char **argv)
|
|||||||
if (fout->remoteVersion < 70300)
|
if (fout->remoteVersion < 70300)
|
||||||
{
|
{
|
||||||
if (fout->remoteVersion >= 70100)
|
if (fout->remoteVersion >= 70100)
|
||||||
g_last_builtin_oid = findLastBuiltinOid_V71(fout, PQdb(g_conn));
|
g_last_builtin_oid = findLastBuiltinOid_V71(fout,
|
||||||
|
PQdb(GetConnection(fout)));
|
||||||
else
|
else
|
||||||
g_last_builtin_oid = findLastBuiltinOid_V70(fout);
|
g_last_builtin_oid = findLastBuiltinOid_V70(fout);
|
||||||
if (g_verbose)
|
if (g_verbose)
|
||||||
@ -870,12 +868,13 @@ pgdump_cleanup_at_exit(int code, void *arg)
|
|||||||
static void
|
static void
|
||||||
setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
|
setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
|
||||||
{
|
{
|
||||||
|
PGconn *conn = GetConnection(AH);
|
||||||
const char *std_strings;
|
const char *std_strings;
|
||||||
|
|
||||||
/* Set the client encoding if requested */
|
/* Set the client encoding if requested */
|
||||||
if (dumpencoding)
|
if (dumpencoding)
|
||||||
{
|
{
|
||||||
if (PQsetClientEncoding(g_conn, dumpencoding) < 0)
|
if (PQsetClientEncoding(conn, dumpencoding) < 0)
|
||||||
{
|
{
|
||||||
write_msg(NULL, "invalid client encoding \"%s\" specified\n",
|
write_msg(NULL, "invalid client encoding \"%s\" specified\n",
|
||||||
dumpencoding);
|
dumpencoding);
|
||||||
@ -887,9 +886,9 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
|
|||||||
* Get the active encoding and the standard_conforming_strings setting, so
|
* Get the active encoding and the standard_conforming_strings setting, so
|
||||||
* we know how to escape strings.
|
* we know how to escape strings.
|
||||||
*/
|
*/
|
||||||
AH->encoding = PQclientEncoding(g_conn);
|
AH->encoding = PQclientEncoding(conn);
|
||||||
|
|
||||||
std_strings = PQparameterStatus(g_conn, "standard_conforming_strings");
|
std_strings = PQparameterStatus(conn, "standard_conforming_strings");
|
||||||
AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
|
AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
|
||||||
|
|
||||||
/* Set the role if requested */
|
/* Set the role if requested */
|
||||||
@ -1018,9 +1017,8 @@ expand_schema_name_patterns(Archive *fout,
|
|||||||
appendPQExpBuffer(query, "UNION ALL\n");
|
appendPQExpBuffer(query, "UNION ALL\n");
|
||||||
appendPQExpBuffer(query,
|
appendPQExpBuffer(query,
|
||||||
"SELECT oid FROM pg_catalog.pg_namespace n\n");
|
"SELECT oid FROM pg_catalog.pg_namespace n\n");
|
||||||
processSQLNamePattern(g_conn, query, cell->val, false, false,
|
processSQLNamePattern(GetConnection(fout), query, cell->val, false,
|
||||||
NULL, "n.nspname", NULL,
|
false, NULL, "n.nspname", NULL, NULL);
|
||||||
NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
|
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
|
||||||
@ -1068,8 +1066,8 @@ expand_table_name_patterns(Archive *fout,
|
|||||||
"\nWHERE c.relkind in ('%c', '%c', '%c', '%c')\n",
|
"\nWHERE c.relkind in ('%c', '%c', '%c', '%c')\n",
|
||||||
RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
|
RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
|
||||||
RELKIND_FOREIGN_TABLE);
|
RELKIND_FOREIGN_TABLE);
|
||||||
processSQLNamePattern(g_conn, query, cell->val, true, false,
|
processSQLNamePattern(GetConnection(fout), query, cell->val, true,
|
||||||
"n.nspname", "c.relname", NULL,
|
false, "n.nspname", "c.relname", NULL,
|
||||||
"pg_catalog.pg_table_is_visible(c.oid)");
|
"pg_catalog.pg_table_is_visible(c.oid)");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1266,6 +1264,7 @@ dumpTableData_copy(Archive *fout, void *dcontext)
|
|||||||
const bool hasoids = tbinfo->hasoids;
|
const bool hasoids = tbinfo->hasoids;
|
||||||
const bool oids = tdinfo->oids;
|
const bool oids = tdinfo->oids;
|
||||||
PQExpBuffer q = createPQExpBuffer();
|
PQExpBuffer q = createPQExpBuffer();
|
||||||
|
PGconn *conn = GetConnection(fout);
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
int ret;
|
int ret;
|
||||||
char *copybuf;
|
char *copybuf;
|
||||||
@ -1332,7 +1331,7 @@ dumpTableData_copy(Archive *fout, void *dcontext)
|
|||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
ret = PQgetCopyData(g_conn, ©buf, 0);
|
ret = PQgetCopyData(conn, ©buf, 0);
|
||||||
|
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
break; /* done or error */
|
break; /* done or error */
|
||||||
@ -1395,17 +1394,17 @@ dumpTableData_copy(Archive *fout, void *dcontext)
|
|||||||
{
|
{
|
||||||
/* copy data transfer failed */
|
/* copy data transfer failed */
|
||||||
write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.\n", classname);
|
write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.\n", classname);
|
||||||
write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn));
|
write_msg(NULL, "Error message from server: %s", PQerrorMessage(conn));
|
||||||
write_msg(NULL, "The command was: %s\n", q->data);
|
write_msg(NULL, "The command was: %s\n", q->data);
|
||||||
exit_nicely(1);
|
exit_nicely(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check command status and return to normal libpq state */
|
/* Check command status and return to normal libpq state */
|
||||||
res = PQgetResult(g_conn);
|
res = PQgetResult(conn);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetResult() failed.\n", classname);
|
write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetResult() failed.\n", classname);
|
||||||
write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn));
|
write_msg(NULL, "Error message from server: %s", PQerrorMessage(conn));
|
||||||
write_msg(NULL, "The command was: %s\n", q->data);
|
write_msg(NULL, "The command was: %s\n", q->data);
|
||||||
exit_nicely(1);
|
exit_nicely(1);
|
||||||
}
|
}
|
||||||
@ -1830,6 +1829,7 @@ dumpDatabase(Archive *fout)
|
|||||||
PQExpBuffer dbQry = createPQExpBuffer();
|
PQExpBuffer dbQry = createPQExpBuffer();
|
||||||
PQExpBuffer delQry = createPQExpBuffer();
|
PQExpBuffer delQry = createPQExpBuffer();
|
||||||
PQExpBuffer creaQry = createPQExpBuffer();
|
PQExpBuffer creaQry = createPQExpBuffer();
|
||||||
|
PGconn *conn = GetConnection(fout);
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
int ntups;
|
int ntups;
|
||||||
int i_tableoid,
|
int i_tableoid,
|
||||||
@ -1850,7 +1850,7 @@ dumpDatabase(Archive *fout)
|
|||||||
*tablespace;
|
*tablespace;
|
||||||
uint32 frozenxid;
|
uint32 frozenxid;
|
||||||
|
|
||||||
datname = PQdb(g_conn);
|
datname = PQdb(conn);
|
||||||
|
|
||||||
if (g_verbose)
|
if (g_verbose)
|
||||||
write_msg(NULL, "saving database definition\n");
|
write_msg(NULL, "saving database definition\n");
|
||||||
@ -2150,10 +2150,10 @@ dumpDatabase(Archive *fout)
|
|||||||
{
|
{
|
||||||
PQExpBuffer seclabelQry = createPQExpBuffer();
|
PQExpBuffer seclabelQry = createPQExpBuffer();
|
||||||
|
|
||||||
buildShSecLabelQuery(g_conn, "pg_database", dbCatId.oid, seclabelQry);
|
buildShSecLabelQuery(conn, "pg_database", dbCatId.oid, seclabelQry);
|
||||||
res = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
|
res = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
|
||||||
resetPQExpBuffer(seclabelQry);
|
resetPQExpBuffer(seclabelQry);
|
||||||
emitShSecLabels(g_conn, res, seclabelQry, "DATABASE", datname);
|
emitShSecLabels(conn, res, seclabelQry, "DATABASE", datname);
|
||||||
if (strlen(seclabelQry->data))
|
if (strlen(seclabelQry->data))
|
||||||
ArchiveEntry(fout, dbCatId, createDumpId(), datname, NULL, NULL,
|
ArchiveEntry(fout, dbCatId, createDumpId(), datname, NULL, NULL,
|
||||||
dba, false, "SECURITY LABEL", SECTION_NONE,
|
dba, false, "SECURITY LABEL", SECTION_NONE,
|
||||||
@ -2362,6 +2362,7 @@ dumpBlobs(Archive *fout, void *arg)
|
|||||||
{
|
{
|
||||||
const char *blobQry;
|
const char *blobQry;
|
||||||
const char *blobFetchQry;
|
const char *blobFetchQry;
|
||||||
|
PGconn *conn = GetConnection(fout);
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
char buf[LOBBUFSIZE];
|
char buf[LOBBUFSIZE];
|
||||||
int ntups;
|
int ntups;
|
||||||
@ -2404,11 +2405,11 @@ dumpBlobs(Archive *fout, void *arg)
|
|||||||
|
|
||||||
blobOid = atooid(PQgetvalue(res, i, 0));
|
blobOid = atooid(PQgetvalue(res, i, 0));
|
||||||
/* Open the BLOB */
|
/* Open the BLOB */
|
||||||
loFd = lo_open(g_conn, blobOid, INV_READ);
|
loFd = lo_open(conn, blobOid, INV_READ);
|
||||||
if (loFd == -1)
|
if (loFd == -1)
|
||||||
{
|
{
|
||||||
write_msg(NULL, "could not open large object %u: %s",
|
write_msg(NULL, "could not open large object %u: %s",
|
||||||
blobOid, PQerrorMessage(g_conn));
|
blobOid, PQerrorMessage(conn));
|
||||||
exit_nicely(1);
|
exit_nicely(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2417,18 +2418,18 @@ dumpBlobs(Archive *fout, void *arg)
|
|||||||
/* Now read it in chunks, sending data to archive */
|
/* Now read it in chunks, sending data to archive */
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
|
cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
|
||||||
if (cnt < 0)
|
if (cnt < 0)
|
||||||
{
|
{
|
||||||
write_msg(NULL, "error reading large object %u: %s",
|
write_msg(NULL, "error reading large object %u: %s",
|
||||||
blobOid, PQerrorMessage(g_conn));
|
blobOid, PQerrorMessage(conn));
|
||||||
exit_nicely(1);
|
exit_nicely(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
WriteData(fout, buf, cnt);
|
WriteData(fout, buf, cnt);
|
||||||
} while (cnt > 0);
|
} while (cnt > 0);
|
||||||
|
|
||||||
lo_close(g_conn, loFd);
|
lo_close(conn, loFd);
|
||||||
|
|
||||||
EndBlob(fout, blobOid);
|
EndBlob(fout, blobOid);
|
||||||
}
|
}
|
||||||
@ -4298,7 +4299,7 @@ getTables(Archive *fout, int *numTables)
|
|||||||
*/
|
*/
|
||||||
resetPQExpBuffer(query);
|
resetPQExpBuffer(query);
|
||||||
appendPQExpBuffer(query, "SET statement_timeout = ");
|
appendPQExpBuffer(query, "SET statement_timeout = ");
|
||||||
appendStringLiteralConn(query, lockWaitTimeout, g_conn);
|
appendStringLiteralConn(query, lockWaitTimeout, GetConnection(fout));
|
||||||
ExecuteSqlStatement(fout, query->data);
|
ExecuteSqlStatement(fout, query->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user