1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

aio: Implement support for reads in smgr/md/fd

This implements the following:

1) An smgr AIO target, for AIO on smgr files. This should be usable not just
   for md.c but also other SMGR implementation if we ever get them.
2) readv support in fd.c, which requires a small bit of infrastructure work in
   fd.c
3) smgr.c and md.c support for readv

There still is nothing performing AIO, but as of this commit it would be
possible.

As part of this change FileGetRawDesc() actually ensures that the file is
opened - previously it was basically not usable. It's used to reopen a file in
IO workers.

Reviewed-by: Noah Misch <noah@leadboat.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
This commit is contained in:
Andres Freund
2025-03-29 13:38:35 -04:00
parent dee8002468
commit 50cb7505b3
10 changed files with 437 additions and 4 deletions

View File

@@ -18,6 +18,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/aio.h" #include "storage/aio.h"
#include "storage/aio_internal.h" #include "storage/aio_internal.h"
#include "storage/md.h"
/* just to have something to put into aio_handle_cbs */ /* just to have something to put into aio_handle_cbs */
@@ -37,6 +38,8 @@ typedef struct PgAioHandleCallbacksEntry
static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback} #define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
#undef CALLBACK_ENTRY #undef CALLBACK_ENTRY
}; };

View File

@@ -16,6 +16,7 @@
#include "storage/aio.h" #include "storage/aio.h"
#include "storage/aio_internal.h" #include "storage/aio_internal.h"
#include "storage/smgr.h"
/* /*
@@ -25,6 +26,7 @@ static const PgAioTargetInfo *pgaio_target_info[] = {
[PGAIO_TID_INVALID] = &(PgAioTargetInfo) { [PGAIO_TID_INVALID] = &(PgAioTargetInfo) {
.name = "invalid", .name = "invalid",
}, },
[PGAIO_TID_SMGR] = &aio_smgr_target_info,
}; };

View File

@@ -94,6 +94,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "postmaster/startup.h" #include "postmaster/startup.h"
#include "storage/aio.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "utils/guc.h" #include "utils/guc.h"
@@ -1296,6 +1297,8 @@ LruDelete(File file)
vfdP = &VfdCache[file]; vfdP = &VfdCache[file];
pgaio_closing_fd(vfdP->fd);
/* /*
* Close the file. We aren't expecting this to fail; if it does, better * Close the file. We aren't expecting this to fail; if it does, better
* to leak the FD than to mess up our internal state. * to leak the FD than to mess up our internal state.
@@ -1989,6 +1992,8 @@ FileClose(File file)
if (!FileIsNotOpen(file)) if (!FileIsNotOpen(file))
{ {
pgaio_closing_fd(vfdP->fd);
/* close the file */ /* close the file */
if (close(vfdP->fd) != 0) if (close(vfdP->fd) != 0)
{ {
@@ -2212,6 +2217,32 @@ retry:
return returnCode; return returnCode;
} }
int
FileStartReadV(PgAioHandle *ioh, File file,
int iovcnt, off_t offset,
uint32 wait_event_info)
{
int returnCode;
Vfd *vfdP;
Assert(FileIsValid(file));
DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d",
file, VfdCache[file].fileName,
(int64) offset,
iovcnt));
returnCode = FileAccess(file);
if (returnCode < 0)
return returnCode;
vfdP = &VfdCache[file];
pgaio_io_start_readv(ioh, vfdP->fd, iovcnt, offset);
return 0;
}
ssize_t ssize_t
FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
uint32 wait_event_info) uint32 wait_event_info)
@@ -2500,6 +2531,12 @@ FilePathName(File file)
int int
FileGetRawDesc(File file) FileGetRawDesc(File file)
{ {
int returnCode;
returnCode = FileAccess(file);
if (returnCode < 0)
return returnCode;
Assert(FileIsValid(file)); Assert(FileIsValid(file));
return VfdCache[file].fd; return VfdCache[file].fd;
} }
@@ -2780,6 +2817,7 @@ FreeDesc(AllocateDesc *desc)
result = closedir(desc->desc.dir); result = closedir(desc->desc.dir);
break; break;
case AllocateDescRawFD: case AllocateDescRawFD:
pgaio_closing_fd(desc->desc.fd);
result = close(desc->desc.fd); result = close(desc->desc.fd);
break; break;
default: default:
@@ -2848,6 +2886,8 @@ CloseTransientFile(int fd)
/* Only get here if someone passes us a file not in allocatedDescs */ /* Only get here if someone passes us a file not in allocatedDescs */
elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile"); elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
pgaio_closing_fd(fd);
return close(fd); return close(fd);
} }

View File

@@ -31,6 +31,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "pg_trace.h" #include "pg_trace.h"
#include "pgstat.h" #include "pgstat.h"
#include "storage/aio.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/md.h" #include "storage/md.h"
@@ -152,6 +153,15 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum,
static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
MdfdVec *seg); MdfdVec *seg);
static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
const PgAioHandleCallbacks aio_md_readv_cb = {
.complete_shared = md_readv_complete,
.report = md_readv_report,
};
static inline int static inline int
_mdfd_open_flags(void) _mdfd_open_flags(void)
{ {
@@ -937,6 +947,69 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
} }
} }
/*
* mdstartreadv() -- Asynchronous version of mdreadv().
*/
void
mdstartreadv(PgAioHandle *ioh,
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
off_t seekpos;
MdfdVec *v;
BlockNumber nblocks_this_segment;
struct iovec *iov;
int iovcnt;
int ret;
v = _mdfd_getseg(reln, forknum, blocknum, false,
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nblocks_this_segment =
Min(nblocks,
RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
if (nblocks_this_segment != nblocks)
elog(ERROR, "read crossing segment boundary");
iovcnt = pgaio_io_get_iovec(ioh, &iov);
Assert(nblocks <= iovcnt);
iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
Assert(iovcnt <= nblocks_this_segment);
if (!(io_direct_flags & IO_DIRECT_DATA))
pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
pgaio_io_set_target_smgr(ioh,
reln,
forknum,
blocknum,
nblocks,
false);
pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_READV, 0);
ret = FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
if (ret != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not start reading blocks %u..%u in file \"%s\": %m",
blocknum,
blocknum + nblocks_this_segment - 1,
FilePathName(v->mdfd_vfd))));
/*
* The error checks corresponding to the post-read checks in mdreadv() are
* in md_readv_complete().
*/
}
/* /*
* mdwritev() -- Write the supplied blocks at the appropriate location. * mdwritev() -- Write the supplied blocks at the appropriate location.
* *
@@ -1365,6 +1438,21 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
} }
} }
int
mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
{
MdfdVec *v = mdopenfork(reln, forknum, EXTENSION_FAIL);
v = _mdfd_getseg(reln, forknum, blocknum, false,
EXTENSION_FAIL);
*off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE);
return FileGetRawDesc(v->mdfd_vfd);
}
/* /*
* register_dirty_segment() -- Mark a relation segment as needing fsync * register_dirty_segment() -- Mark a relation segment as needing fsync
* *
@@ -1841,3 +1929,111 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
*/ */
return ftag->rlocator.dbOid == candidate->rlocator.dbOid; return ftag->rlocator.dbOid == candidate->rlocator.dbOid;
} }
/*
* AIO completion callback for mdstartreadv().
*/
static PgAioResult
md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
{
PgAioTargetData *td = pgaio_io_get_target_data(ioh);
PgAioResult result = prior_result;
if (prior_result.result < 0)
{
result.status = PGAIO_RS_ERROR;
result.id = PGAIO_HCB_MD_READV;
/* For "hard" errors, track the error number in error_data */
result.error_data = -prior_result.result;
result.result = 0;
/*
* Immediately log a message about the IO error, but only to the
* server log. The reason to do so immediately is that the originator
* might not process the query result immediately (because it is busy
* doing another part of query processing) or at all (e.g. if it was
* cancelled or errored out due to another IO also failing). The
* issuer of the IO will emit an ERROR when processing the IO's
* results
*/
pgaio_result_report(result, td, LOG_SERVER_ONLY);
return result;
}
/*
* As explained above smgrstartreadv(), the smgr API operates on the level
* of blocks, rather than bytes. Convert.
*/
result.result /= BLCKSZ;
Assert(result.result <= td->smgr.nblocks);
if (result.result == 0)
{
/* consider 0 blocks read a failure */
result.status = PGAIO_RS_ERROR;
result.id = PGAIO_HCB_MD_READV;
result.error_data = 0;
/* see comment above the "hard error" case */
pgaio_result_report(result, td, LOG_SERVER_ONLY);
return result;
}
if (result.status != PGAIO_RS_ERROR &&
result.result < td->smgr.nblocks)
{
/* partial reads should be retried at upper level */
result.status = PGAIO_RS_PARTIAL;
result.id = PGAIO_HCB_MD_READV;
}
return result;
}
/*
* AIO error reporting callback for mdstartreadv().
*
* Errors are encoded as follows:
* - PgAioResult.error_data != 0 encodes IO that failed with that errno
* - PgAioResult.error_data == 0 encodes IO that didn't read all data
*/
static void
md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
{
RelPathStr path;
path = relpathbackend(td->smgr.rlocator,
td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
td->smgr.forkNum);
if (result.error_data != 0)
{
/* for errcode_for_file_access() and %m */
errno = result.error_data;
ereport(elevel,
errcode_for_file_access(),
errmsg("could not read blocks %u..%u in file \"%s\": %m",
td->smgr.blockNum,
td->smgr.blockNum + td->smgr.nblocks - 1,
path.str));
}
else
{
/*
* NB: This will typically only be output in debug messages, while
* retrying a partial IO.
*/
ereport(elevel,
errcode(ERRCODE_DATA_CORRUPTED),
errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
td->smgr.blockNum,
td->smgr.blockNum + td->smgr.nblocks - 1,
path.str,
result.result * (size_t) BLCKSZ,
td->smgr.nblocks * (size_t) BLCKSZ));
}
}

View File

@@ -66,6 +66,7 @@
#include "access/xlogutils.h" #include "access/xlogutils.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/aio.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/md.h" #include "storage/md.h"
@@ -106,6 +107,10 @@ typedef struct f_smgr
void (*smgr_readv) (SMgrRelation reln, ForkNumber forknum, void (*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks); void **buffers, BlockNumber nblocks);
void (*smgr_startreadv) (PgAioHandle *ioh,
SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum, void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, const void **buffers, BlockNumber nblocks,
@@ -117,6 +122,7 @@ typedef struct f_smgr
BlockNumber old_blocks, BlockNumber nblocks); BlockNumber old_blocks, BlockNumber nblocks);
void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum); void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
void (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum); void (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
int (*smgr_fd) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
} f_smgr; } f_smgr;
static const f_smgr smgrsw[] = { static const f_smgr smgrsw[] = {
@@ -134,12 +140,14 @@ static const f_smgr smgrsw[] = {
.smgr_prefetch = mdprefetch, .smgr_prefetch = mdprefetch,
.smgr_maxcombine = mdmaxcombine, .smgr_maxcombine = mdmaxcombine,
.smgr_readv = mdreadv, .smgr_readv = mdreadv,
.smgr_startreadv = mdstartreadv,
.smgr_writev = mdwritev, .smgr_writev = mdwritev,
.smgr_writeback = mdwriteback, .smgr_writeback = mdwriteback,
.smgr_nblocks = mdnblocks, .smgr_nblocks = mdnblocks,
.smgr_truncate = mdtruncate, .smgr_truncate = mdtruncate,
.smgr_immedsync = mdimmedsync, .smgr_immedsync = mdimmedsync,
.smgr_registersync = mdregistersync, .smgr_registersync = mdregistersync,
.smgr_fd = mdfd,
} }
}; };
@@ -157,6 +165,16 @@ static dlist_head unpinned_relns;
static void smgrshutdown(int code, Datum arg); static void smgrshutdown(int code, Datum arg);
static void smgrdestroy(SMgrRelation reln); static void smgrdestroy(SMgrRelation reln);
static void smgr_aio_reopen(PgAioHandle *ioh);
static char *smgr_aio_describe_identity(const PgAioTargetData *sd);
const PgAioTargetInfo aio_smgr_target_info = {
.name = "smgr",
.reopen = smgr_aio_reopen,
.describe_identity = smgr_aio_describe_identity,
};
/* /*
* smgrinit(), smgrshutdown() -- Initialize or shut down storage * smgrinit(), smgrshutdown() -- Initialize or shut down storage
@@ -709,6 +727,30 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
} }
/*
* smgrstartreadv() -- asynchronous version of smgrreadv()
*
* This starts an asynchronous readv IO using the IO handle `ioh`. Other than
* `ioh` all parameters are the same as smgrreadv().
*
* Completion callbacks above smgr will be passed the result as the number of
* successfully read blocks if the read [partially] succeeds (Buffers for
* blocks not successfully read might bear unspecified modifications, up to
* the full nblocks). This maintains the abstraction that smgr operates on the
* level of blocks, rather than bytes.
*/
void
smgrstartreadv(PgAioHandle *ioh,
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
{
HOLD_INTERRUPTS();
smgrsw[reln->smgr_which].smgr_startreadv(ioh,
reln, forknum, blocknum, buffers,
nblocks);
RESUME_INTERRUPTS();
}
/* /*
* smgrwritev() -- Write the supplied buffers out. * smgrwritev() -- Write the supplied buffers out.
* *
@@ -917,6 +959,29 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
} }
/*
* Return fd for the specified block number and update *off to the appropriate
* position.
*
* This is only to be used for when AIO needs to perform the IO in a different
* process than where it was issued (e.g. in an IO worker).
*/
static int
smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
{
int fd;
/*
* The caller needs to prevent interrupts from being processed, otherwise
* the FD could be closed prematurely.
*/
Assert(!INTERRUPTS_CAN_BE_PROCESSED());
fd = smgrsw[reln->smgr_which].smgr_fd(reln, forknum, blocknum, off);
return fd;
}
/* /*
* AtEOXact_SMgr * AtEOXact_SMgr
* *
@@ -945,3 +1010,99 @@ ProcessBarrierSmgrRelease(void)
smgrreleaseall(); smgrreleaseall();
return true; return true;
} }
/*
* Set target of the IO handle to be smgr and initialize all the relevant
* pieces of data.
*/
void
pgaio_io_set_target_smgr(PgAioHandle *ioh,
SMgrRelationData *smgr,
ForkNumber forknum,
BlockNumber blocknum,
int nblocks,
bool skip_fsync)
{
PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
pgaio_io_set_target(ioh, PGAIO_TID_SMGR);
/* backend is implied via IO owner */
sd->smgr.rlocator = smgr->smgr_rlocator.locator;
sd->smgr.forkNum = forknum;
sd->smgr.blockNum = blocknum;
sd->smgr.nblocks = nblocks;
sd->smgr.is_temp = SmgrIsTemp(smgr);
/* Temp relations should never be fsync'd */
sd->smgr.skip_fsync = skip_fsync && !SmgrIsTemp(smgr);
}
/*
* Callback for the smgr AIO target, to reopen the file (e.g. because the IO
* is executed in a worker).
*/
static void
smgr_aio_reopen(PgAioHandle *ioh)
{
PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
PgAioOpData *od = pgaio_io_get_op_data(ioh);
SMgrRelation reln;
ProcNumber procno;
uint32 off;
/*
* The caller needs to prevent interrupts from being processed, otherwise
* the FD could be closed again before we get to executing the IO.
*/
Assert(!INTERRUPTS_CAN_BE_PROCESSED());
if (sd->smgr.is_temp)
procno = pgaio_io_get_owner(ioh);
else
procno = INVALID_PROC_NUMBER;
reln = smgropen(sd->smgr.rlocator, procno);
switch (pgaio_io_get_op(ioh))
{
case PGAIO_OP_INVALID:
pg_unreachable();
break;
case PGAIO_OP_READV:
od->read.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
Assert(off == od->read.offset);
break;
case PGAIO_OP_WRITEV:
od->write.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
Assert(off == od->write.offset);
break;
}
}
/*
* Callback for the smgr AIO target, describing the target of the IO.
*/
static char *
smgr_aio_describe_identity(const PgAioTargetData *sd)
{
RelPathStr path;
char *desc;
path = relpathbackend(sd->smgr.rlocator,
sd->smgr.is_temp ?
MyProcNumber : INVALID_PROC_NUMBER,
sd->smgr.forkNum);
if (sd->smgr.nblocks == 0)
desc = psprintf(_("file \"%s\""), path.str);
else if (sd->smgr.nblocks == 1)
desc = psprintf(_("block %u in file \"%s\""),
sd->smgr.blockNum,
path.str);
else
desc = psprintf(_("blocks %u..%u in file \"%s\""),
sd->smgr.blockNum,
sd->smgr.blockNum + sd->smgr.nblocks - 1,
path.str);
return desc;
}

View File

@@ -117,9 +117,10 @@ typedef enum PgAioTargetID
{ {
/* intentionally the zero value, to help catch zeroed memory etc */ /* intentionally the zero value, to help catch zeroed memory etc */
PGAIO_TID_INVALID = 0, PGAIO_TID_INVALID = 0,
PGAIO_TID_SMGR,
} PgAioTargetID; } PgAioTargetID;
#define PGAIO_TID_COUNT (PGAIO_TID_INVALID + 1) #define PGAIO_TID_COUNT (PGAIO_TID_SMGR + 1)
/* /*
@@ -191,6 +192,8 @@ struct PgAioTargetInfo
typedef enum PgAioHandleCallbackID typedef enum PgAioHandleCallbackID
{ {
PGAIO_HCB_INVALID, PGAIO_HCB_INVALID,
PGAIO_HCB_MD_READV,
} PgAioHandleCallbackID; } PgAioHandleCallbackID;

View File

@@ -60,11 +60,15 @@ typedef struct PgAioWaitRef
*/ */
typedef union PgAioTargetData typedef union PgAioTargetData
{ {
/* just as an example placeholder for later */
struct struct
{ {
uint32 queue_id; RelFileLocator rlocator; /* physical relation identifier */
} wal; BlockNumber blockNum; /* blknum relative to begin of reln */
BlockNumber nblocks;
ForkNumber forkNum:8; /* don't waste 4 byte for four values */
bool is_temp:1; /* proc can be inferred by owning AIO */
bool skip_fsync:1;
} smgr;
} PgAioTargetData; } PgAioTargetData;

View File

@@ -101,6 +101,8 @@ extern PGDLLIMPORT int max_safe_fds;
* prototypes for functions in fd.c * prototypes for functions in fd.c
*/ */
struct PgAioHandle;
/* Operations on virtual Files --- equivalent to Unix kernel file ops */ /* Operations on virtual Files --- equivalent to Unix kernel file ops */
extern File PathNameOpenFile(const char *fileName, int fileFlags); extern File PathNameOpenFile(const char *fileName, int fileFlags);
extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode); extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
@@ -109,6 +111,7 @@ extern void FileClose(File file);
extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info);
extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
extern int FileSync(File file, uint32 wait_event_info); extern int FileSync(File file, uint32 wait_event_info);
extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);

View File

@@ -14,11 +14,14 @@
#ifndef MD_H #ifndef MD_H
#define MD_H #define MD_H
#include "storage/aio_types.h"
#include "storage/block.h" #include "storage/block.h"
#include "storage/relfilelocator.h" #include "storage/relfilelocator.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "storage/sync.h" #include "storage/sync.h"
extern const PgAioHandleCallbacks aio_md_readv_cb;
/* md storage manager functionality */ /* md storage manager functionality */
extern void mdinit(void); extern void mdinit(void);
extern void mdopen(SMgrRelation reln); extern void mdopen(SMgrRelation reln);
@@ -36,6 +39,9 @@ extern uint32 mdmaxcombine(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum); BlockNumber blocknum);
extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks); void **buffers, BlockNumber nblocks);
extern void mdstartreadv(PgAioHandle *ioh,
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
extern void mdwritev(SMgrRelation reln, ForkNumber forknum, extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, bool skipFsync); const void **buffers, BlockNumber nblocks, bool skipFsync);
@@ -46,6 +52,7 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber old_blocks, BlockNumber nblocks); BlockNumber old_blocks, BlockNumber nblocks);
extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum); extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void mdregistersync(SMgrRelation reln, ForkNumber forknum); extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
extern int mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
extern void ForgetDatabaseSyncRequests(Oid dbid); extern void ForgetDatabaseSyncRequests(Oid dbid);
extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo); extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);

View File

@@ -15,6 +15,7 @@
#define SMGR_H #define SMGR_H
#include "lib/ilist.h" #include "lib/ilist.h"
#include "storage/aio_types.h"
#include "storage/block.h" #include "storage/block.h"
#include "storage/relfilelocator.h" #include "storage/relfilelocator.h"
@@ -73,6 +74,8 @@ typedef SMgrRelationData *SMgrRelation;
#define SmgrIsTemp(smgr) \ #define SmgrIsTemp(smgr) \
RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator) RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
extern const PgAioTargetInfo aio_smgr_target_info;
extern void smgrinit(void); extern void smgrinit(void);
extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend); extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
extern bool smgrexists(SMgrRelation reln, ForkNumber forknum); extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -97,6 +100,10 @@ extern uint32 smgrmaxcombine(SMgrRelation reln, ForkNumber forknum,
extern void smgrreadv(SMgrRelation reln, ForkNumber forknum, extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks); void **buffers, BlockNumber nblocks);
extern void smgrstartreadv(PgAioHandle *ioh,
SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
extern void smgrwritev(SMgrRelation reln, ForkNumber forknum, extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, const void **buffers, BlockNumber nblocks,
@@ -127,4 +134,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync); smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
} }
extern void pgaio_io_set_target_smgr(PgAioHandle *ioh,
SMgrRelationData *smgr,
ForkNumber forknum,
BlockNumber blocknum,
int nblocks,
bool skip_fsync);
#endif /* SMGR_H */ #endif /* SMGR_H */