1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-30 19:03:16 +03:00

Add new sessions function sqlite3changeset_apply_v3() and its streaming equivalent. This allows changesets to be filtered on a per-change basis, not just per-table.

FossilOrigin-Name: 10ebd7a119ef1985755ef143a941fbaed1b5ca1c8a71e011c8bbc70e383fd337
This commit is contained in:
dan
2025-07-14 14:51:43 +00:00
parent f8addcf937
commit 08f8111bd8
6 changed files with 451 additions and 129 deletions

View File

@ -5182,6 +5182,10 @@ static int sessionChangesetApply(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
),
int(*xFilterIter)(
void *pCtx, /* Copy of sixth arg to _apply() */
sqlite3_changeset_iter *p
),
int(*xConflict)(
void *pCtx, /* Copy of fifth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
@ -5322,6 +5326,9 @@ static int sessionChangesetApply(
** next change. A log message has already been issued. */
if( schemaMismatch ) continue;
/* If this is a call to apply_v3(), invoke xFilterIter here. */
if( xFilterIter && 0==xFilterIter(pCtx, pIter) ) continue;
rc = sessionApplyOneWithRetry(db, pIter, &sApply, xConflict, pCtx);
}
@ -5389,6 +5396,87 @@ static int sessionChangesetApply(
return rc;
}
/*
** This function is called by all six sqlite3changeset_apply() variants:
**
** + sqlite3changeset_apply()
** + sqlite3changeset_apply_v2()
** + sqlite3changeset_apply_v3()
** + sqlite3changeset_apply_strm()
** + sqlite3changeset_apply_strm_v2()
** + sqlite3changeset_apply_strm_v3()
**
** Arguments passed to this function are as follows:
**
** db:
** Database handle to apply changeset to main database of.
**
** nChangeset/pChangeset:
** These are both passed zero for the streaming variants. For the normal
** apply() functions, these are passed the size of and the buffer containing
** the changeset, respectively.
**
** xInput/pIn:
** These are both passed zero for the normal variants. For the streaming
** apply() functions, these are passed the input callback and context
** pointer, respectively.
**
** xFilter:
** The filter function as passed to apply() or apply_v2() (to filter by
** table name), if any. This is always NULL for apply_v3() calls.
**
** xFilterIter:
** The filter function as passed to apply_v3(), if any.
**
** xConflict:
** The conflict handler callback (must not be NULL).
**
** pCtx:
** The context pointer passed to the xFilter and xConflict handler callbacks.
**
** ppRebase, pnRebase:
** Zero for apply(). The rebase changeset output pointers, if any, for
** apply_v2() and apply_v3().
**
** flags:
** Zero for apply(). The flags parameter for apply_v2() and apply_v3().
*/
static int sessionChangesetApplyV23(
sqlite3 *db, /* Apply change to "main" db of this handle */
int nChangeset, /* Size of changeset in bytes */
void *pChangeset, /* Changeset blob */
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
void *pIn, /* First arg for xInput */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
),
int(*xFilterIter)(
void *pCtx, /* Copy of sixth arg to _apply() */
sqlite3_changeset_iter *p /* Handle describing current change */
),
int(*xConflict)(
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *p /* Handle describing change and conflict */
),
void *pCtx, /* First argument passed to xConflict */
void **ppRebase, int *pnRebase,
int flags
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int bInverse = !!(flags & SQLITE_CHANGESETAPPLY_INVERT);
int rc = sessionChangesetStart(
&pIter, xInput, pIn, nChangeset, pChangeset, bInverse, 1
);
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(db, pIter,
xFilter, xFilterIter, xConflict, pCtx, ppRebase, pnRebase, flags
);
}
return rc;
}
/*
** Apply the changeset passed via pChangeset/nChangeset to the main
** database attached to handle "db".
@ -5410,17 +5498,39 @@ int sqlite3changeset_apply_v2(
void **ppRebase, int *pnRebase,
int flags
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int bInv = !!(flags & SQLITE_CHANGESETAPPLY_INVERT);
int rc = sessionChangesetStart(&pIter, 0, 0, nChangeset, pChangeset, bInv, 1);
return sessionChangesetApplyV23(db,
nChangeset, pChangeset, 0, 0,
xFilter, 0, xConflict, pCtx,
ppRebase, pnRebase, flags
);
}
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(
db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase, flags
);
}
return rc;
/*
** Apply the changeset passed via pChangeset/nChangeset to the main
** database attached to handle "db".
*/
int sqlite3changeset_apply_v3(
sqlite3 *db, /* Apply change to "main" db of this handle */
int nChangeset, /* Size of changeset in bytes */
void *pChangeset, /* Changeset blob */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
sqlite3_changeset_iter *p /* Handle describing current change */
),
int(*xConflict)(
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *p /* Handle describing change and conflict */
),
void *pCtx, /* First argument passed to xConflict */
void **ppRebase, int *pnRebase,
int flags
){
return sessionChangesetApplyV23(db,
nChangeset, pChangeset, 0, 0,
0, xFilter, xConflict, pCtx,
ppRebase, pnRebase, flags
);
}
/*
@ -5443,8 +5553,10 @@ int sqlite3changeset_apply(
),
void *pCtx /* First argument passed to xConflict */
){
return sqlite3changeset_apply_v2(
db, nChangeset, pChangeset, xFilter, xConflict, pCtx, 0, 0, 0
return sessionChangesetApplyV23(db,
nChangeset, pChangeset, 0, 0,
xFilter, 0, xConflict, pCtx,
0, 0, 0
);
}
@ -5453,6 +5565,29 @@ int sqlite3changeset_apply(
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply_v3_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
void *pIn, /* First arg for xInput */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
sqlite3_changeset_iter *p
),
int(*xConflict)(
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *p /* Handle describing change and conflict */
),
void *pCtx, /* First argument passed to xConflict */
void **ppRebase, int *pnRebase,
int flags
){
return sessionChangesetApplyV23(db,
0, 0, xInput, pIn,
0, xFilter, xConflict, pCtx,
ppRebase, pnRebase, flags
);
}
int sqlite3changeset_apply_v2_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
@ -5470,15 +5605,11 @@ int sqlite3changeset_apply_v2_strm(
void **ppRebase, int *pnRebase,
int flags
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int bInverse = !!(flags & SQLITE_CHANGESETAPPLY_INVERT);
int rc = sessionChangesetStart(&pIter, xInput, pIn, 0, 0, bInverse, 1);
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(
db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase, flags
);
}
return rc;
return sessionChangesetApplyV23(db,
0, 0, xInput, pIn,
xFilter, 0, xConflict, pCtx,
ppRebase, pnRebase, flags
);
}
int sqlite3changeset_apply_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
@ -5495,8 +5626,10 @@ int sqlite3changeset_apply_strm(
),
void *pCtx /* First argument passed to xConflict */
){
return sqlite3changeset_apply_v2_strm(
db, xInput, pIn, xFilter, xConflict, pCtx, 0, 0, 0
return sessionChangesetApplyV23(db,
0, 0, xInput, pIn,
xFilter, 0, xConflict, pCtx,
0, 0, 0
);
}