mirror of
https://github.com/sqlite/sqlite.git
synced 2025-08-01 06:27:03 +03:00
Have the sqlite3session_apply() function and its streaming equivalent retry any operations that failed with SQLITE_CONSTRAINT after all other operations on the same table have been attempted. New code is largely untested.
FossilOrigin-Name: 1085911afb51744f32fe9db183b50e8e88bdd73e
This commit is contained in:
@ -67,6 +67,8 @@ struct SessionBuffer {
|
||||
** sqlite3changeset_start_strm()).
|
||||
*/
|
||||
struct SessionInput {
|
||||
int bNoDiscard; /* If true, discard no data */
|
||||
int iCurrent; /* Offset in aData[] of current change */
|
||||
int iNext; /* Offset in aData[] of next change */
|
||||
u8 *aData; /* Pointer to buffer containing changeset */
|
||||
int nData; /* Number of bytes in aData */
|
||||
@ -2460,7 +2462,6 @@ static int sessionChangesetStart(
|
||||
pRet->in.nData = nChangeset;
|
||||
pRet->in.xInput = xInput;
|
||||
pRet->in.pIn = pIn;
|
||||
pRet->in.iNext = 0;
|
||||
pRet->in.bEof = (xInput ? 0 : 1);
|
||||
|
||||
/* Populate the output variable and return success. */
|
||||
@ -2490,6 +2491,23 @@ int sqlite3changeset_start_strm(
|
||||
return sessionChangesetStart(pp, xInput, pIn, 0, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
** If the SessionInput object passed as the only argument is a streaming
|
||||
** object and the buffer is full, discard some data to free up space.
|
||||
*/
|
||||
static void sessionDiscardData(SessionInput *pIn){
|
||||
if( pIn->bEof && pIn->xInput && pIn->iNext>=SESSIONS_STRM_CHUNK_SIZE ){
|
||||
int nMove = pIn->buf.nBuf - pIn->iNext;
|
||||
assert( nMove>=0 );
|
||||
if( nMove>0 ){
|
||||
memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
|
||||
}
|
||||
pIn->buf.nBuf -= pIn->iNext;
|
||||
pIn->iNext = 0;
|
||||
pIn->nData = pIn->buf.nBuf;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
** Ensure that there are at least nByte bytes available in the buffer. Or,
|
||||
** if there are not nByte bytes remaining in the input, that all available
|
||||
@ -2503,13 +2521,7 @@ static int sessionInputBuffer(SessionInput *pIn, int nByte){
|
||||
while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){
|
||||
int nNew = SESSIONS_STRM_CHUNK_SIZE;
|
||||
|
||||
if( pIn->iNext>=SESSIONS_STRM_CHUNK_SIZE ){
|
||||
int nMove = pIn->buf.nBuf - pIn->iNext;
|
||||
memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
|
||||
pIn->buf.nBuf -= pIn->iNext;
|
||||
pIn->iNext = 0;
|
||||
}
|
||||
|
||||
if( pIn->bNoDiscard==0 ) sessionDiscardData(pIn);
|
||||
if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){
|
||||
rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew);
|
||||
if( nNew==0 ){
|
||||
@ -2818,11 +2830,15 @@ static int sessionChangesetNext(
|
||||
return SQLITE_DONE;
|
||||
}
|
||||
|
||||
sessionDiscardData(&p->in);
|
||||
p->in.iCurrent = p->in.iNext;
|
||||
|
||||
op = p->in.aData[p->in.iNext++];
|
||||
if( op=='T' || op=='P' ){
|
||||
p->bPatchset = (op=='P');
|
||||
if( sessionChangesetReadTblhdr(p) ) return p->rc;
|
||||
if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
|
||||
p->in.iCurrent = p->in.iNext;
|
||||
op = p->in.aData[p->in.iNext++];
|
||||
}
|
||||
|
||||
@ -3266,6 +3282,9 @@ struct SessionApplyCtx {
|
||||
int nCol; /* Size of azCol[] and abPK[] arrays */
|
||||
const char **azCol; /* Array of column names */
|
||||
u8 *abPK; /* Boolean array - true if column is in PK */
|
||||
|
||||
int bDeferConstraints; /* True to defer constraints */
|
||||
SessionBuffer constraints; /* Deferred constraints are stored here */
|
||||
};
|
||||
|
||||
/*
|
||||
@ -3516,7 +3535,7 @@ static int sessionBindValue(
|
||||
** transfers new.* values from the current iterator entry to statement
|
||||
** pStmt. The table being inserted into has nCol columns.
|
||||
**
|
||||
** New.* value $i 0 from the iterator is bound to variable ($i+1) of
|
||||
** New.* value $i from the iterator is bound to variable ($i+1) of
|
||||
** statement pStmt. If parameter abPK is NULL, all values from 0 to (nCol-1)
|
||||
** are transfered to the statement. Otherwise, if abPK is not NULL, it points
|
||||
** to an array nCol elements in size. In this case only those values for
|
||||
@ -3662,9 +3681,18 @@ static int sessionConflictHandler(
|
||||
pIter->pConflict = 0;
|
||||
rc = sqlite3_reset(p->pSelect);
|
||||
}else if( rc==SQLITE_OK ){
|
||||
/* No other row with the new.* primary key. */
|
||||
res = xConflict(pCtx, eType+1, pIter);
|
||||
if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE;
|
||||
if( p->bDeferConstraints && eType==SQLITE_CHANGESET_CONFLICT ){
|
||||
/* Instead of invoking the conflict handler, append the change blob
|
||||
** to the SessionApplyCtx.constraints buffer. */
|
||||
u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent];
|
||||
int nBlob = pIter->in.iNext - pIter->in.iCurrent;
|
||||
sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc);
|
||||
res = SQLITE_CHANGESET_OMIT;
|
||||
}else{
|
||||
/* No other row with the new.* primary key. */
|
||||
res = xConflict(pCtx, eType+1, pIter);
|
||||
if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE;
|
||||
}
|
||||
}
|
||||
|
||||
if( rc==SQLITE_OK ){
|
||||
@ -3824,6 +3852,96 @@ static int sessionApplyOneOp(
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int sessionApplyOneWithRetry(
|
||||
sqlite3 *db, /* Apply change to "main" db of this handle */
|
||||
sqlite3_changeset_iter *pIter, /* Changeset iterator to read change from */
|
||||
SessionApplyCtx *pApply, /* Apply context */
|
||||
int(*xConflict)(void*, int, sqlite3_changeset_iter*),
|
||||
void *pCtx /* First argument passed to xConflict */
|
||||
){
|
||||
int bReplace = 0;
|
||||
int bRetry = 0;
|
||||
int rc;
|
||||
|
||||
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, &bRetry);
|
||||
|
||||
if( rc==SQLITE_OK && bRetry ){
|
||||
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, 0);
|
||||
}
|
||||
|
||||
if( bReplace ){
|
||||
assert( pIter->op==SQLITE_INSERT );
|
||||
rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionBindRow(pIter,
|
||||
sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete);
|
||||
sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
sqlite3_step(pApply->pDelete);
|
||||
rc = sqlite3_reset(pApply->pDelete);
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
** Retry the changes accumulated in the pApply->constraints buffer.
|
||||
*/
|
||||
static int sessionRetryConstraints(
|
||||
sqlite3 *db,
|
||||
int bPatchset,
|
||||
const char *zTab,
|
||||
SessionApplyCtx *pApply,
|
||||
int(*xConflict)(void*, int, sqlite3_changeset_iter*),
|
||||
void *pCtx /* First argument passed to xConflict */
|
||||
){
|
||||
int rc = SQLITE_OK;
|
||||
|
||||
while( pApply->constraints.nBuf ){
|
||||
sqlite3_changeset_iter *pIter2 = 0;
|
||||
SessionBuffer cons = pApply->constraints;
|
||||
memset(&pApply->constraints, 0, sizeof(SessionBuffer));
|
||||
|
||||
rc = sessionChangesetStart(&pIter2, 0, 0, cons.nBuf, cons.aBuf);
|
||||
if( rc==SQLITE_OK ){
|
||||
int nByte = 2*pApply->nCol*sizeof(sqlite3_value*);
|
||||
int rc2;
|
||||
pIter2->bPatchset = bPatchset;
|
||||
pIter2->zTab = (char*)zTab;
|
||||
pIter2->nCol = pApply->nCol;
|
||||
pIter2->abPK = pApply->abPK;
|
||||
sessionBufferGrow(&pIter2->tblhdr, nByte, &rc);
|
||||
pIter2->apValue = (sqlite3_value**)pIter2->tblhdr.aBuf;
|
||||
if( rc==SQLITE_OK ) memset(pIter2->apValue, 0, nByte);
|
||||
|
||||
while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter2) ){
|
||||
rc = sessionApplyOneWithRetry(db, pIter2, pApply, xConflict, pCtx);
|
||||
}
|
||||
|
||||
rc2 = sqlite3changeset_finalize(pIter2);
|
||||
if( rc==SQLITE_OK ) rc==rc2;
|
||||
}
|
||||
assert( pApply->bDeferConstraints || pApply->constraints.nBuf==0 );
|
||||
|
||||
sqlite3_free(cons.aBuf);
|
||||
if( rc!=SQLITE_OK ) break;
|
||||
if( pApply->constraints.nBuf>=cons.nBuf ){
|
||||
/* No progress was made on the last round. */
|
||||
pApply->bDeferConstraints = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
** Argument pIter is a changeset iterator that has been initialized, but
|
||||
** not yet passed to sqlite3changeset_next(). This function applies the
|
||||
@ -3853,6 +3971,7 @@ static int sessionChangesetApply(
|
||||
|
||||
assert( xConflict!=0 );
|
||||
|
||||
pIter->in.bNoDiscard = 1;
|
||||
memset(&sApply, 0, sizeof(sApply));
|
||||
sqlite3_mutex_enter(sqlite3_db_mutex(db));
|
||||
rc = sqlite3_exec(db, "SAVEPOINT changeset_apply", 0, 0, 0);
|
||||
@ -3862,8 +3981,6 @@ static int sessionChangesetApply(
|
||||
while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter) ){
|
||||
int nCol;
|
||||
int op;
|
||||
int bReplace = 0;
|
||||
int bRetry = 0;
|
||||
const char *zNew;
|
||||
|
||||
sqlite3changeset_op(pIter, &zNew, &nCol, &op, 0);
|
||||
@ -3871,6 +3988,11 @@ static int sessionChangesetApply(
|
||||
if( zTab==0 || sqlite3_strnicmp(zNew, zTab, nTab+1) ){
|
||||
u8 *abPK;
|
||||
|
||||
rc = sessionRetryConstraints(
|
||||
db, pIter->bPatchset, zTab, &sApply, xConflict, pCtx
|
||||
);
|
||||
if( rc!=SQLITE_OK ) break;
|
||||
|
||||
sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */
|
||||
sqlite3_finalize(sApply.pDelete);
|
||||
sqlite3_finalize(sApply.pUpdate);
|
||||
@ -3878,6 +4000,7 @@ static int sessionChangesetApply(
|
||||
sqlite3_finalize(sApply.pSelect);
|
||||
memset(&sApply, 0, sizeof(sApply));
|
||||
sApply.db = db;
|
||||
sApply.bDeferConstraints = 1;
|
||||
|
||||
/* If an xFilter() callback was specified, invoke it now. If the
|
||||
** xFilter callback returns zero, skip this table. If it returns
|
||||
@ -3933,31 +4056,13 @@ static int sessionChangesetApply(
|
||||
** next change. A log message has already been issued. */
|
||||
if( schemaMismatch ) continue;
|
||||
|
||||
rc = sessionApplyOneOp(pIter, &sApply, xConflict, pCtx, &bReplace, &bRetry);
|
||||
rc = sessionApplyOneWithRetry(db, pIter, &sApply, xConflict, pCtx);
|
||||
}
|
||||
|
||||
if( rc==SQLITE_OK && bRetry ){
|
||||
rc = sessionApplyOneOp(pIter, &sApply, xConflict, pCtx, &bReplace, 0);
|
||||
}
|
||||
|
||||
if( bReplace ){
|
||||
assert( pIter->op==SQLITE_INSERT );
|
||||
rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionBindRow(pIter,
|
||||
sqlite3changeset_new, sApply.nCol, sApply.abPK, sApply.pDelete);
|
||||
sqlite3_bind_int(sApply.pDelete, sApply.nCol+1, 1);
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
sqlite3_step(sApply.pDelete);
|
||||
rc = sqlite3_reset(sApply.pDelete);
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionApplyOneOp(pIter, &sApply, xConflict, pCtx, 0, 0);
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
|
||||
}
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionRetryConstraints(
|
||||
db, pIter->bPatchset, zTab, &sApply, xConflict, pCtx
|
||||
);
|
||||
}
|
||||
|
||||
if( rc==SQLITE_OK ){
|
||||
@ -3994,6 +4099,7 @@ static int sessionChangesetApply(
|
||||
sqlite3_finalize(sApply.pUpdate);
|
||||
sqlite3_finalize(sApply.pSelect);
|
||||
sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */
|
||||
sqlite3_free((char*)sApply.constraints.aBuf);
|
||||
sqlite3_mutex_leave(sqlite3_db_mutex(db));
|
||||
return rc;
|
||||
}
|
||||
|
Reference in New Issue
Block a user