1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-27 20:41:58 +03:00

Add sqlite3_changeset_apply_v2() and apply_v2_strm() to the sessions module.

FossilOrigin-Name: 445bfe977d9f3a891e08ef33237862ed047fe83e134ef3ed8b47ee0f5abd8cd6
This commit is contained in:
dan
2018-03-13 20:31:23 +00:00
parent 58db4c760f
commit a38e6c57bc
6 changed files with 375 additions and 65 deletions

View File

@ -3410,6 +3410,8 @@ struct SessionApplyCtx {
int bStat1; /* True if table is sqlite_stat1 */
int bDeferConstraints; /* True to defer constraints */
SessionBuffer constraints; /* Deferred constraints are stored here */
SessionBuffer rebase; /* Rebase information (if any) here */
int bRebaseStarted; /* If table header is already in rebase */
};
/*
@ -3791,6 +3793,60 @@ static int sessionSeekToRow(
return rc;
}
static int sessionRebaseAdd(
SessionApplyCtx *p,
int eType,
sqlite3_changeset_iter *pIter
){
int rc = SQLITE_OK;
int i;
int eOp = pIter->op;
if( p->bRebaseStarted==0 ){
/* Append a table-header to the rebase buffer */
const char *zTab = pIter->zTab;
sessionAppendByte(&p->rebase, 'T', &rc);
sessionAppendVarint(&p->rebase, p->nCol, &rc);
sessionAppendBlob(&p->rebase, p->abPK, p->nCol, &rc);
sessionAppendBlob(&p->rebase, (u8*)zTab, (int)strlen(zTab)+1, &rc);
p->bRebaseStarted = 1;
}
assert( eType==SQLITE_CHANGESET_REPLACE||eType==SQLITE_CHANGESET_OMIT );
assert( eOp==SQLITE_DELETE || eOp==SQLITE_INSERT || eOp==SQLITE_UPDATE );
if( eType==SQLITE_CHANGESET_REPLACE ){
sessionAppendByte(&p->rebase, SQLITE_DELETE, &rc);
sessionAppendByte(&p->rebase, 0, &rc);
for(i=0; i<p->nCol; i++){
if( p->abPK[i]==0 ){
sessionAppendByte(&p->rebase, 0, &rc);
}else{
sqlite3_value *pVal = 0;
if( eOp==SQLITE_INSERT ){
sqlite3changeset_new(pIter, i, &pVal);
}else{
sqlite3changeset_old(pIter, i, &pVal);
}
sessionAppendValue(&p->rebase, pVal, &rc);
}
}
}else{
sessionAppendByte(&p->rebase, SQLITE_INSERT, &rc);
sessionAppendByte(&p->rebase, eOp==SQLITE_DELETE, &rc);
for(i=0; i<p->nCol; i++){
sqlite3_value *pVal = 0;
if( eOp!=SQLITE_INSERT && p->abPK[i] ){
sqlite3changeset_old(pIter, i, &pVal);
}else{
sqlite3changeset_new(pIter, i, &pVal);
}
sessionAppendValue(&p->rebase, pVal, &rc);
}
}
return rc;
}
/*
** Invoke the conflict handler for the change that the changeset iterator
** currently points to.
@ -3866,7 +3922,7 @@ static int sessionConflictHandler(
u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent];
int nBlob = pIter->in.iNext - pIter->in.iCurrent;
sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc);
res = SQLITE_CHANGESET_OMIT;
return SQLITE_OK;
}else{
/* No other row with the new.* primary key. */
res = xConflict(pCtx, eType+1, pIter);
@ -3892,6 +3948,9 @@ static int sessionConflictHandler(
rc = SQLITE_MISUSE;
break;
}
if( rc==SQLITE_OK ){
rc = sessionRebaseAdd(p, res, pIter);
}
}
return rc;
@ -4067,42 +4126,42 @@ static int sessionApplyOneWithRetry(
int rc;
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, &bRetry);
assert( rc==SQLITE_OK || (bRetry==0 && bReplace==0) );
/* If the bRetry flag is set, the change has not been applied due to an
** SQLITE_CHANGESET_DATA problem (i.e. this is an UPDATE or DELETE and
** a row with the correct PK is present in the db, but one or more other
** fields do not contain the expected values) and the conflict handler
** returned SQLITE_CHANGESET_REPLACE. In this case retry the operation,
** but pass NULL as the final argument so that sessionApplyOneOp() ignores
** the SQLITE_CHANGESET_DATA problem. */
if( bRetry ){
assert( pIter->op==SQLITE_UPDATE || pIter->op==SQLITE_DELETE );
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
}
/* If the bReplace flag is set, the change is an INSERT that has not
** been performed because the database already contains a row with the
** specified primary key and the conflict handler returned
** SQLITE_CHANGESET_REPLACE. In this case remove the conflicting row
** before reattempting the INSERT. */
else if( bReplace ){
assert( pIter->op==SQLITE_INSERT );
rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
if( rc==SQLITE_OK ){
rc = sessionBindRow(pIter,
sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete);
sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
}
if( rc==SQLITE_OK ){
sqlite3_step(pApply->pDelete);
rc = sqlite3_reset(pApply->pDelete);
}
if( rc==SQLITE_OK ){
if( rc==SQLITE_OK ){
/* If the bRetry flag is set, the change has not been applied due to an
** SQLITE_CHANGESET_DATA problem (i.e. this is an UPDATE or DELETE and
** a row with the correct PK is present in the db, but one or more other
** fields do not contain the expected values) and the conflict handler
** returned SQLITE_CHANGESET_REPLACE. In this case retry the operation,
** but pass NULL as the final argument so that sessionApplyOneOp() ignores
** the SQLITE_CHANGESET_DATA problem. */
if( bRetry ){
assert( pIter->op==SQLITE_UPDATE || pIter->op==SQLITE_DELETE );
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
}
if( rc==SQLITE_OK ){
rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
/* If the bReplace flag is set, the change is an INSERT that has not
** been performed because the database already contains a row with the
** specified primary key and the conflict handler returned
** SQLITE_CHANGESET_REPLACE. In this case remove the conflicting row
** before reattempting the INSERT. */
else if( bReplace ){
assert( pIter->op==SQLITE_INSERT );
rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
if( rc==SQLITE_OK ){
rc = sessionBindRow(pIter,
sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete);
sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
}
if( rc==SQLITE_OK ){
sqlite3_step(pApply->pDelete);
rc = sqlite3_reset(pApply->pDelete);
}
if( rc==SQLITE_OK ){
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
}
if( rc==SQLITE_OK ){
rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
}
}
}
@ -4178,7 +4237,8 @@ static int sessionChangesetApply(
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *p /* Handle describing change and conflict */
),
void *pCtx /* First argument passed to xConflict */
void *pCtx, /* First argument passed to xConflict */
void **ppRebase, int *pnRebase /* OUT: Rebase information */
){
int schemaMismatch = 0;
int rc; /* Return code */
@ -4219,6 +4279,7 @@ static int sessionChangesetApply(
memset(&sApply, 0, sizeof(sApply));
sApply.db = db;
sApply.bDeferConstraints = 1;
sApply.bRebaseStarted = 0;
/* If an xFilter() callback was specified, invoke it now. If the
** xFilter callback returns zero, skip this table. If it returns
@ -4328,16 +4389,48 @@ static int sessionChangesetApply(
sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0);
}
if( rc==SQLITE_OK && ppRebase && pnRebase ){
*ppRebase = (void*)sApply.rebase.aBuf;
*pnRebase = sApply.rebase.nBuf;
sApply.rebase.aBuf = 0;
}
sqlite3_finalize(sApply.pInsert);
sqlite3_finalize(sApply.pDelete);
sqlite3_finalize(sApply.pUpdate);
sqlite3_finalize(sApply.pSelect);
sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */
sqlite3_free((char*)sApply.constraints.aBuf);
sqlite3_free((char*)sApply.rebase.aBuf);
sqlite3_mutex_leave(sqlite3_db_mutex(db));
return rc;
}
int sqlite3changeset_apply_v2(
sqlite3 *db, /* Apply change to "main" db of this handle */
int nChangeset, /* Size of changeset in bytes */
void *pChangeset, /* Changeset blob */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
),
int(*xConflict)(
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *p /* Handle describing change and conflict */
),
void *pCtx, /* First argument passed to xConflict */
void **ppRebase, int *pnRebase
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(
db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase
);
}
return rc;
}
/*
** Apply the changeset passed via pChangeset/nChangeset to the main database
** attached to handle "db". Invoke the supplied conflict handler callback
@ -4358,12 +4451,9 @@ int sqlite3changeset_apply(
),
void *pCtx /* First argument passed to xConflict */
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
}
return rc;
return sqlite3changeset_apply_v2(
db, nChangeset, pChangeset, xFilter, xConflict, pCtx, 0, 0
);
}
/*
@ -4371,6 +4461,31 @@ int sqlite3changeset_apply(
** attached to handle "db". Invoke the supplied conflict handler callback
** to resolve any conflicts encountered while applying the change.
*/
int sqlite3changeset_apply_v2_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
void *pIn, /* First arg for xInput */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
),
int(*xConflict)(
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *p /* Handle describing change and conflict */
),
void *pCtx, /* First argument passed to xConflict */
void **ppRebase, int *pnRebase
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(
db, pIter, xFilter, xConflict, pCtx, ppRebase, pnRebase
);
}
return rc;
}
int sqlite3changeset_apply_strm(
sqlite3 *db, /* Apply change to "main" db of this handle */
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
@ -4386,12 +4501,9 @@ int sqlite3changeset_apply_strm(
),
void *pCtx /* First argument passed to xConflict */
){
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
if( rc==SQLITE_OK ){
rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
}
return rc;
return sqlite3changeset_apply_v2_strm(
db, xInput, pIn, xFilter, xConflict, pCtx, 0, 0
);
}
/*