mirror of
https://github.com/postgres/postgres.git
synced 2025-07-02 09:02:37 +03:00
Add support for piping COPY to/from an external program.
This includes backend "COPY TO/FROM PROGRAM '...'" syntax, and corresponding psql \copy syntax. Like with reading/writing files, the backend version is superuser-only, and in the psql version, the program is run in the client. In the passing, the psql \copy STDIN/STDOUT syntax is subtly changed: if you the stdin/stdout is quoted, it's now interpreted as a filename. For example, "\copy foo from 'stdin'" now reads from a file called 'stdin', not from standard input. Before this, there was no way to specify a filename called stdin, stdout, pstdin or pstdout. This creates a new function in pgport, wait_result_to_str(), which can be used to convert the exit status of a process, as returned by wait(3), to a human-readable string. Etsuro Fujita, reviewed by Amit Kapila.
This commit is contained in:
@ -58,7 +58,7 @@
|
||||
*/
|
||||
typedef enum CopyDest
|
||||
{
|
||||
COPY_FILE, /* to/from file */
|
||||
COPY_FILE, /* to/from file (or a piped program) */
|
||||
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
|
||||
COPY_NEW_FE /* to/from frontend (3.0 protocol) */
|
||||
} CopyDest;
|
||||
@ -108,6 +108,7 @@ typedef struct CopyStateData
|
||||
QueryDesc *queryDesc; /* executable query to copy from */
|
||||
List *attnumlist; /* integer list of attnums to copy */
|
||||
char *filename; /* filename, or NULL for STDIN/STDOUT */
|
||||
bool is_program; /* is 'filename' a program to popen? */
|
||||
bool binary; /* binary format? */
|
||||
bool oids; /* include OIDs? */
|
||||
bool freeze; /* freeze rows on loading? */
|
||||
@ -277,8 +278,10 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||
static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
|
||||
const char *queryString, List *attnamelist, List *options);
|
||||
static void EndCopy(CopyState cstate);
|
||||
static void ClosePipeToProgram(CopyState cstate);
|
||||
static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
|
||||
const char *filename, List *attnamelist, List *options);
|
||||
const char *filename, bool is_program, List *attnamelist,
|
||||
List *options);
|
||||
static void EndCopyTo(CopyState cstate);
|
||||
static uint64 DoCopyTo(CopyState cstate);
|
||||
static uint64 CopyTo(CopyState cstate);
|
||||
@ -482,9 +485,35 @@ CopySendEndOfRow(CopyState cstate)
|
||||
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
|
||||
cstate->copy_file) != 1 ||
|
||||
ferror(cstate->copy_file))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write to COPY file: %m")));
|
||||
{
|
||||
if (cstate->is_program)
|
||||
{
|
||||
if (errno == EPIPE)
|
||||
{
|
||||
/*
|
||||
* The pipe will be closed automatically on error at
|
||||
* the end of transaction, but we might get a better
|
||||
* error message from the subprocess' exit code than
|
||||
* just "Broken Pipe"
|
||||
*/
|
||||
ClosePipeToProgram(cstate);
|
||||
|
||||
/*
|
||||
* If ClosePipeToProgram() didn't throw an error,
|
||||
* the program terminated normally, but closed the
|
||||
* pipe first. Restore errno, and throw an error.
|
||||
*/
|
||||
errno = EPIPE;
|
||||
}
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write to COPY program: %m")));
|
||||
}
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write to COPY file: %m")));
|
||||
}
|
||||
break;
|
||||
case COPY_OLD_FE:
|
||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||
@ -752,13 +781,22 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
|
||||
Relation rel;
|
||||
Oid relid;
|
||||
|
||||
/* Disallow file COPY except to superusers. */
|
||||
/* Disallow COPY to/from file or program except to superusers. */
|
||||
if (!pipe && !superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from a file"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
{
|
||||
if (stmt->is_program)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from an external program"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("must be superuser to COPY to or from a file"),
|
||||
errhint("Anyone can COPY to stdout or from stdin. "
|
||||
"psql's \\copy command also works for anyone.")));
|
||||
}
|
||||
|
||||
if (stmt->relation)
|
||||
{
|
||||
@ -812,14 +850,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
|
||||
if (XactReadOnly && !rel->rd_islocaltemp)
|
||||
PreventCommandIfReadOnly("COPY FROM");
|
||||
|
||||
cstate = BeginCopyFrom(rel, stmt->filename,
|
||||
cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
|
||||
stmt->attlist, stmt->options);
|
||||
*processed = CopyFrom(cstate); /* copy from file to database */
|
||||
EndCopyFrom(cstate);
|
||||
}
|
||||
else
|
||||
{
|
||||
cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
|
||||
cstate = BeginCopyTo(rel, stmt->query, queryString,
|
||||
stmt->filename, stmt->is_program,
|
||||
stmt->attlist, stmt->options);
|
||||
*processed = DoCopyTo(cstate); /* copy from database to file */
|
||||
EndCopyTo(cstate);
|
||||
@ -1389,17 +1428,45 @@ BeginCopy(bool is_from,
|
||||
return cstate;
|
||||
}
|
||||
|
||||
/*
|
||||
* Closes the pipe to an external program, checking the pclose() return code.
|
||||
*/
|
||||
static void
|
||||
ClosePipeToProgram(CopyState cstate)
|
||||
{
|
||||
int pclose_rc;
|
||||
|
||||
Assert(cstate->is_program);
|
||||
|
||||
pclose_rc = ClosePipeStream(cstate->copy_file);
|
||||
if (pclose_rc == -1)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not close pipe to external command: %m")));
|
||||
else if (pclose_rc != 0)
|
||||
ereport(ERROR,
|
||||
(errmsg("program \"%s\" failed",
|
||||
cstate->filename),
|
||||
errdetail_internal("%s", wait_result_to_str(pclose_rc))));
|
||||
}
|
||||
|
||||
/*
|
||||
* Release resources allocated in a cstate for COPY TO/FROM.
|
||||
*/
|
||||
static void
|
||||
EndCopy(CopyState cstate)
|
||||
{
|
||||
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not close file \"%s\": %m",
|
||||
cstate->filename)));
|
||||
if (cstate->is_program)
|
||||
{
|
||||
ClosePipeToProgram(cstate);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not close file \"%s\": %m",
|
||||
cstate->filename)));
|
||||
}
|
||||
|
||||
MemoryContextDelete(cstate->copycontext);
|
||||
pfree(cstate);
|
||||
@ -1413,6 +1480,7 @@ BeginCopyTo(Relation rel,
|
||||
Node *query,
|
||||
const char *queryString,
|
||||
const char *filename,
|
||||
bool is_program,
|
||||
List *attnamelist,
|
||||
List *options)
|
||||
{
|
||||
@ -1451,39 +1519,52 @@ BeginCopyTo(Relation rel,
|
||||
|
||||
if (pipe)
|
||||
{
|
||||
Assert(!is_program); /* the grammar does not allow this */
|
||||
if (whereToSendOutput != DestRemote)
|
||||
cstate->copy_file = stdout;
|
||||
}
|
||||
else
|
||||
{
|
||||
mode_t oumask; /* Pre-existing umask value */
|
||||
struct stat st;
|
||||
|
||||
/*
|
||||
* Prevent write to relative path ... too easy to shoot oneself in the
|
||||
* foot by overwriting a database file ...
|
||||
*/
|
||||
if (!is_absolute_path(filename))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_NAME),
|
||||
errmsg("relative path not allowed for COPY to file")));
|
||||
|
||||
cstate->filename = pstrdup(filename);
|
||||
oumask = umask(S_IWGRP | S_IWOTH);
|
||||
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
|
||||
umask(oumask);
|
||||
cstate->is_program = is_program;
|
||||
|
||||
if (cstate->copy_file == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for writing: %m",
|
||||
cstate->filename)));
|
||||
if (is_program)
|
||||
{
|
||||
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
|
||||
if (cstate->copy_file == NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not execute command \"%s\": %m",
|
||||
cstate->filename)));
|
||||
}
|
||||
else
|
||||
{
|
||||
mode_t oumask; /* Pre-existing umask value */
|
||||
struct stat st;
|
||||
|
||||
fstat(fileno(cstate->copy_file), &st);
|
||||
if (S_ISDIR(st.st_mode))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a directory", cstate->filename)));
|
||||
/*
|
||||
* Prevent write to relative path ... too easy to shoot oneself in
|
||||
* the foot by overwriting a database file ...
|
||||
*/
|
||||
if (!is_absolute_path(filename))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_NAME),
|
||||
errmsg("relative path not allowed for COPY to file")));
|
||||
|
||||
oumask = umask(S_IWGRP | S_IWOTH);
|
||||
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
|
||||
umask(oumask);
|
||||
if (cstate->copy_file == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for writing: %m",
|
||||
cstate->filename)));
|
||||
|
||||
fstat(fileno(cstate->copy_file), &st);
|
||||
if (S_ISDIR(st.st_mode))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a directory", cstate->filename)));
|
||||
}
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
@ -2317,6 +2398,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
|
||||
CopyState
|
||||
BeginCopyFrom(Relation rel,
|
||||
const char *filename,
|
||||
bool is_program,
|
||||
List *attnamelist,
|
||||
List *options)
|
||||
{
|
||||
@ -2413,9 +2495,11 @@ BeginCopyFrom(Relation rel,
|
||||
cstate->defexprs = defexprs;
|
||||
cstate->volatile_defexprs = volatile_defexprs;
|
||||
cstate->num_defaults = num_defaults;
|
||||
cstate->is_program = is_program;
|
||||
|
||||
if (pipe)
|
||||
{
|
||||
Assert(!is_program); /* the grammar does not allow this */
|
||||
if (whereToSendOutput == DestRemote)
|
||||
ReceiveCopyBegin(cstate);
|
||||
else
|
||||
@ -2423,22 +2507,33 @@ BeginCopyFrom(Relation rel,
|
||||
}
|
||||
else
|
||||
{
|
||||
struct stat st;
|
||||
|
||||
cstate->filename = pstrdup(filename);
|
||||
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
|
||||
|
||||
if (cstate->copy_file == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for reading: %m",
|
||||
cstate->filename)));
|
||||
if (cstate->is_program)
|
||||
{
|
||||
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
|
||||
if (cstate->copy_file == NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not execute command \"%s\": %m",
|
||||
cstate->filename)));
|
||||
}
|
||||
else
|
||||
{
|
||||
struct stat st;
|
||||
|
||||
fstat(fileno(cstate->copy_file), &st);
|
||||
if (S_ISDIR(st.st_mode))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a directory", cstate->filename)));
|
||||
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
|
||||
if (cstate->copy_file == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" for reading: %m",
|
||||
cstate->filename)));
|
||||
|
||||
fstat(fileno(cstate->copy_file), &st);
|
||||
if (S_ISDIR(st.st_mode))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("\"%s\" is a directory", cstate->filename)));
|
||||
}
|
||||
}
|
||||
|
||||
if (!cstate->binary)
|
||||
|
Reference in New Issue
Block a user