mirror of
https://github.com/sqlite/sqlite.git
synced 2025-07-30 19:03:16 +03:00
Add streaming version of sqlite3changeset_apply(). Tests and fixes for the same and sqlite3changeset_start_str().
FossilOrigin-Name: b917fc146876f764442de08d5ec36e5b4cf5ab52
This commit is contained in:
@ -17,7 +17,11 @@ typedef struct SessionInput SessionInput;
|
||||
/*
|
||||
** Minimum chunk size used by streaming versions of functions.
|
||||
*/
|
||||
#ifdef SQLITE_TEST
|
||||
#define SESSIONS_STR_CHUNK_SIZE 1
|
||||
#else
|
||||
#define SESSIONS_STR_CHUNK_SIZE 1024
|
||||
#endif
|
||||
|
||||
/*
|
||||
** Session handle structure.
|
||||
@ -51,9 +55,10 @@ struct SessionBuffer {
|
||||
** a stream function (sqlite3changeset_start_str()).
|
||||
*/
|
||||
struct SessionInput {
|
||||
int iNext; /* Offset in aChangeset[] of next change */
|
||||
u8 *aChangeset; /* Pointer to buffer containing changeset */
|
||||
int nChangeset; /* Number of bytes in aChangeset */
|
||||
int iNext; /* Offset in aData[] of next change */
|
||||
u8 *aData; /* Pointer to buffer containing changeset */
|
||||
int nData; /* Number of bytes in aData */
|
||||
|
||||
SessionBuffer buf; /* Current read buffer */
|
||||
int (*xInput)(void*, void*, int*); /* Input stream call (or NULL) */
|
||||
void *pIn; /* First argument to xInput */
|
||||
@ -2033,8 +2038,8 @@ int sessionChangesetStart(
|
||||
pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte);
|
||||
if( !pRet ) return SQLITE_NOMEM;
|
||||
memset(pRet, 0, sizeof(sqlite3_changeset_iter));
|
||||
pRet->in.aChangeset = (u8 *)pChangeset;
|
||||
pRet->in.nChangeset = nChangeset;
|
||||
pRet->in.aData = (u8 *)pChangeset;
|
||||
pRet->in.nData = nChangeset;
|
||||
pRet->in.xInput = xInput;
|
||||
pRet->in.pIn = pIn;
|
||||
pRet->in.iNext = 0;
|
||||
@ -2074,10 +2079,31 @@ int sqlite3changeset_start_str(
|
||||
**
|
||||
** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise.
|
||||
*/
|
||||
static int sessionInputBuffer(SessionInput *pInput, int nByte){
|
||||
static int sessionInputBuffer(SessionInput *pIn, int nByte){
|
||||
int rc = SQLITE_OK;
|
||||
if( pInput->xInput && !pInput->bEof ){
|
||||
assert( 0 );
|
||||
if( pIn->xInput ){
|
||||
while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){
|
||||
int nNew = SESSIONS_STR_CHUNK_SIZE;
|
||||
|
||||
if( pIn->iNext>=SESSIONS_STR_CHUNK_SIZE ){
|
||||
int nMove = pIn->buf.nBuf - pIn->iNext;
|
||||
memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
|
||||
pIn->buf.nBuf -= pIn->iNext;
|
||||
pIn->iNext = 0;
|
||||
}
|
||||
|
||||
if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){
|
||||
rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew);
|
||||
if( nNew==0 ){
|
||||
pIn->bEof = 1;
|
||||
}else{
|
||||
pIn->buf.nBuf += nNew;
|
||||
}
|
||||
}
|
||||
|
||||
pIn->aData = pIn->buf.aBuf;
|
||||
pIn->nData = pIn->buf.nBuf;
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
@ -2107,6 +2133,25 @@ static void sessionSkipRecord(
|
||||
*ppRec = aRec;
|
||||
}
|
||||
|
||||
/*
|
||||
** This function sets the value of the sqlite3_value object passed as the
|
||||
** first argument to a copy of the string or blob held in the aData[]
|
||||
** buffer. SQLITE_OK is returned if successful, or SQLITE_NOMEM if an OOM
|
||||
** error occurs.
|
||||
*/
|
||||
static int sessionValueSetStr(
|
||||
sqlite3_value *pVal, /* Set the value of this object */
|
||||
u8 *aData, /* Buffer containing string or blob data */
|
||||
int nData, /* Size of buffer aData[] in bytes */
|
||||
u8 enc /* String encoding (0 for blobs) */
|
||||
){
|
||||
u8 *aCopy = sqlite3_malloc(nData);
|
||||
if( aCopy==0 ) return SQLITE_NOMEM;
|
||||
memcpy(aCopy, aData, nData);
|
||||
sqlite3ValueSetStr(pVal, nData, (char*)aCopy, enc, sqlite3_free);
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
** Deserialize a single record from a buffer in memory. See "RECORD FORMAT"
|
||||
** for details.
|
||||
@ -2145,7 +2190,7 @@ static int sessionReadRecord(
|
||||
if( abPK && abPK[i]==0 ) continue;
|
||||
rc = sessionInputBuffer(pIn, 9);
|
||||
if( rc==SQLITE_OK ){
|
||||
eType = pIn->aChangeset[pIn->iNext++];
|
||||
eType = pIn->aData[pIn->iNext++];
|
||||
}
|
||||
|
||||
assert( !apOut || apOut[i]==0 );
|
||||
@ -2157,15 +2202,14 @@ static int sessionReadRecord(
|
||||
}
|
||||
|
||||
if( rc==SQLITE_OK ){
|
||||
u8 *aVal = &pIn->aChangeset[pIn->iNext];
|
||||
u8 *aVal = &pIn->aData[pIn->iNext];
|
||||
if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
|
||||
int nByte;
|
||||
pIn->iNext += sessionVarintGet(aVal, &nByte);
|
||||
rc = sessionInputBuffer(pIn, nByte);
|
||||
if( apOut && rc==SQLITE_OK ){
|
||||
u8 *aRec = &pIn->aChangeset[pIn->iNext];
|
||||
u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
|
||||
sqlite3ValueSetStr(apOut[i], nByte, (char *)aRec, enc, SQLITE_STATIC);
|
||||
rc = sessionValueSetStr(apOut[i],&pIn->aData[pIn->iNext],nByte,enc);
|
||||
}
|
||||
pIn->iNext += nByte;
|
||||
}
|
||||
@ -2204,20 +2248,23 @@ static int sessionReadRecord(
|
||||
static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){
|
||||
int rc = SQLITE_OK;
|
||||
int nCol = 0;
|
||||
int iIn = pIn->iNext;
|
||||
int nRead = 0;
|
||||
|
||||
rc = sessionInputBuffer(pIn, 9);
|
||||
if( rc==SQLITE_OK ){
|
||||
iIn += sessionVarintGet(&pIn->aChangeset[iIn], &nCol);
|
||||
rc = sessionInputBuffer(pIn, nCol+100);
|
||||
iIn += nCol;
|
||||
nRead += sessionVarintGet(&pIn->aData[pIn->iNext + nRead], &nCol);
|
||||
rc = sessionInputBuffer(pIn, nRead+nCol+100);
|
||||
nRead += nCol;
|
||||
}
|
||||
|
||||
while( rc==SQLITE_OK ){
|
||||
while( iIn<pIn->nChangeset && pIn->aChangeset[iIn] ) iIn++;
|
||||
if( pIn->aChangeset[iIn]==0 ) break;
|
||||
rc = sessionInputBuffer(pIn, 100);
|
||||
while( (pIn->iNext + nRead)<pIn->nData && pIn->aData[pIn->iNext + nRead] ){
|
||||
nRead++;
|
||||
}
|
||||
if( pIn->aData[pIn->iNext + nRead]==0 ) break;
|
||||
rc = sessionInputBuffer(pIn, nRead + 100);
|
||||
}
|
||||
if( pnByte ) *pnByte = (iIn+1 - pIn->iNext);
|
||||
if( pnByte ) *pnByte = nRead+1;
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -2238,7 +2285,7 @@ static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){
|
||||
if( rc==SQLITE_OK ){
|
||||
int nByte;
|
||||
int nVarint;
|
||||
nVarint = sessionVarintGet(&p->in.aChangeset[p->in.iNext], &p->nCol);
|
||||
nVarint = sessionVarintGet(&p->in.aData[p->in.iNext], &p->nCol);
|
||||
nCopy -= nVarint;
|
||||
p->in.iNext += nVarint;
|
||||
nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy;
|
||||
@ -2249,7 +2296,7 @@ static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){
|
||||
if( rc==SQLITE_OK ){
|
||||
int iPK = sizeof(sqlite3_value*)*p->nCol*2;
|
||||
memset(p->tblhdr.aBuf, 0, iPK);
|
||||
memcpy(&p->tblhdr.aBuf[iPK], &p->in.aChangeset[p->in.iNext], nCopy);
|
||||
memcpy(&p->tblhdr.aBuf[iPK], &p->in.aData[p->in.iNext], nCopy);
|
||||
p->in.iNext += nCopy;
|
||||
}
|
||||
|
||||
@ -2305,25 +2352,25 @@ static int sessionChangesetNext(
|
||||
if( p->rc!=SQLITE_OK ) return p->rc;
|
||||
|
||||
/* If the iterator is already at the end of the changeset, return DONE. */
|
||||
if( p->in.iNext>=p->in.nChangeset ){
|
||||
if( p->in.iNext>=p->in.nData ){
|
||||
return SQLITE_DONE;
|
||||
}
|
||||
|
||||
op = p->in.aChangeset[p->in.iNext++];
|
||||
op = p->in.aData[p->in.iNext++];
|
||||
if( op=='T' || op=='P' ){
|
||||
p->bPatchset = (op=='P');
|
||||
if( sessionChangesetReadTblhdr(p) ) return p->rc;
|
||||
if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
|
||||
op = p->in.aChangeset[p->in.iNext++];
|
||||
op = p->in.aData[p->in.iNext++];
|
||||
}
|
||||
|
||||
p->op = op;
|
||||
p->bIndirect = p->in.aChangeset[p->in.iNext++];
|
||||
p->bIndirect = p->in.aData[p->in.iNext++];
|
||||
if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
|
||||
return (p->rc = SQLITE_CORRUPT);
|
||||
return (p->rc = SQLITE_CORRUPT_BKPT);
|
||||
}
|
||||
|
||||
if( paRec ){ *paRec = &p->in.aChangeset[p->in.iNext]; }
|
||||
if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; }
|
||||
|
||||
/* If this is an UPDATE or DELETE, read the old.* record. */
|
||||
if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
|
||||
@ -2340,7 +2387,7 @@ static int sessionChangesetNext(
|
||||
}
|
||||
|
||||
if( pnRec ){
|
||||
*pnRec = (int)(&p->in.aChangeset[p->in.iNext] - *paRec);
|
||||
*pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec);
|
||||
}else if( p->bPatchset && p->op==SQLITE_UPDATE ){
|
||||
/* If this is an UPDATE that is part of a patchset, then all PK and
|
||||
** modified fields are present in the new.* record. The old.* record
|
||||
@ -2530,6 +2577,7 @@ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
|
||||
for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
|
||||
}
|
||||
sqlite3_free(p->tblhdr.aBuf);
|
||||
sqlite3_free(p->in.buf.aBuf);
|
||||
sqlite3_free(p);
|
||||
return rc;
|
||||
}
|
||||
@ -2560,8 +2608,8 @@ int sqlite3changeset_invert(
|
||||
|
||||
/* Set up the input stream */
|
||||
memset(&sInput, 0, sizeof(SessionInput));
|
||||
sInput.nChangeset = nChangeset;
|
||||
sInput.aChangeset = (u8*)pChangeset;
|
||||
sInput.nData = nChangeset;
|
||||
sInput.aData = (u8*)pChangeset;
|
||||
|
||||
aOut = (u8 *)sqlite3_malloc(nChangeset);
|
||||
if( !aOut ) return SQLITE_NOMEM;
|
||||
@ -2571,7 +2619,7 @@ int sqlite3changeset_invert(
|
||||
while( i<nChangeset ){
|
||||
u8 eType;
|
||||
if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert;
|
||||
eType = sInput.aChangeset[sInput.iNext];
|
||||
eType = sInput.aData[sInput.iNext];
|
||||
switch( eType ){
|
||||
case 'T': {
|
||||
/* A 'table' record consists of:
|
||||
@ -2588,12 +2636,12 @@ int sqlite3changeset_invert(
|
||||
if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){
|
||||
goto finished_invert;
|
||||
}
|
||||
nVarint = sessionVarintGet(&sInput.aChangeset[iNext+1], &nCol);
|
||||
nVarint = sessionVarintGet(&sInput.aData[iNext+1], &nCol);
|
||||
sPK.nBuf = 0;
|
||||
sessionAppendBlob(&sPK, &sInput.aChangeset[iNext+1+nVarint], nCol, &rc);
|
||||
sessionAppendBlob(&sPK, &sInput.aData[iNext+1+nVarint], nCol, &rc);
|
||||
if( rc ) goto finished_invert;
|
||||
sInput.iNext += nByte;
|
||||
memcpy(&aOut[i], &sInput.aChangeset[iNext], nByte+1);
|
||||
memcpy(&aOut[i], &sInput.aData[iNext], nByte+1);
|
||||
i += nByte+1;
|
||||
sqlite3_free(apVal);
|
||||
apVal = 0;
|
||||
@ -2611,7 +2659,7 @@ int sqlite3changeset_invert(
|
||||
aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
|
||||
aOut[i+1] = aIn[i+1]; /* indirect-flag */
|
||||
nByte = sInput.iNext - iStart;
|
||||
memcpy(&aOut[i+2], &sInput.aChangeset[iStart], nByte);
|
||||
memcpy(&aOut[i+2], &sInput.aData[iStart], nByte);
|
||||
i += 2 + nByte;
|
||||
break;
|
||||
}
|
||||
@ -2631,7 +2679,7 @@ int sqlite3changeset_invert(
|
||||
|
||||
/* Write the header for the new UPDATE change. Same as the original. */
|
||||
aOut[i] = SQLITE_UPDATE;
|
||||
aOut[i+1] = sInput.aChangeset[sInput.iNext+1];
|
||||
aOut[i+1] = sInput.aData[sInput.iNext+1];
|
||||
nWrite = 2;
|
||||
|
||||
/* Read the old.* and new.* records for the update change. */
|
||||
@ -2671,7 +2719,7 @@ int sqlite3changeset_invert(
|
||||
}
|
||||
|
||||
default:
|
||||
rc = SQLITE_CORRUPT;
|
||||
rc = SQLITE_CORRUPT_BKPT;
|
||||
goto finished_invert;
|
||||
}
|
||||
}
|
||||
@ -3258,14 +3306,15 @@ static int sessionApplyOneOp(
|
||||
}
|
||||
|
||||
/*
|
||||
** Apply the changeset passed via pChangeset/nChangeset to the main database
|
||||
** attached to handle "db". Invoke the supplied conflict handler callback
|
||||
** to resolve any conflicts encountered while applying the change.
|
||||
** Argument pIter is a changeset iterator that has been initialized, but
|
||||
** not yet passed to sqlite3changeset_next(). This function applies the
|
||||
** changeset to the main database attached to handle "db". The supplied
|
||||
** conflict handler callback is invoked to resolve any conflicts encountered
|
||||
** while applying the change.
|
||||
*/
|
||||
int sqlite3changeset_apply(
|
||||
static int sessionChangesetApply(
|
||||
sqlite3 *db, /* Apply change to "main" db of this handle */
|
||||
int nChangeset, /* Size of changeset in bytes */
|
||||
void *pChangeset, /* Changeset blob */
|
||||
sqlite3_changeset_iter *pIter, /* Changeset to apply */
|
||||
int(*xFilter)(
|
||||
void *pCtx, /* Copy of sixth arg to _apply() */
|
||||
const char *zTab /* Table name */
|
||||
@ -3278,7 +3327,6 @@ int sqlite3changeset_apply(
|
||||
void *pCtx /* First argument passed to xConflict */
|
||||
){
|
||||
int schemaMismatch = 0;
|
||||
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
|
||||
int rc; /* Return code */
|
||||
const char *zTab = 0; /* Name of current table */
|
||||
int nTab = 0; /* Result of sqlite3Strlen30(zTab) */
|
||||
@ -3287,9 +3335,6 @@ int sqlite3changeset_apply(
|
||||
assert( xConflict!=0 );
|
||||
|
||||
memset(&sApply, 0, sizeof(sApply));
|
||||
rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
|
||||
if( rc!=SQLITE_OK ) return rc;
|
||||
|
||||
sqlite3_mutex_enter(sqlite3_db_mutex(db));
|
||||
rc = sqlite3_exec(db, "SAVEPOINT changeset_apply", 0, 0, 0);
|
||||
if( rc==SQLITE_OK ){
|
||||
@ -3430,6 +3475,62 @@ int sqlite3changeset_apply(
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
** Apply the changeset passed via pChangeset/nChangeset to the main database
|
||||
** attached to handle "db". Invoke the supplied conflict handler callback
|
||||
** to resolve any conflicts encountered while applying the change.
|
||||
*/
|
||||
int sqlite3changeset_apply(
|
||||
sqlite3 *db, /* Apply change to "main" db of this handle */
|
||||
int nChangeset, /* Size of changeset in bytes */
|
||||
void *pChangeset, /* Changeset blob */
|
||||
int(*xFilter)(
|
||||
void *pCtx, /* Copy of sixth arg to _apply() */
|
||||
const char *zTab /* Table name */
|
||||
),
|
||||
int(*xConflict)(
|
||||
void *pCtx, /* Copy of fifth arg to _apply() */
|
||||
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
|
||||
sqlite3_changeset_iter *p /* Handle describing change and conflict */
|
||||
),
|
||||
void *pCtx /* First argument passed to xConflict */
|
||||
){
|
||||
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
|
||||
int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
** Apply the changeset passed via xInput/pIn to the main database
|
||||
** attached to handle "db". Invoke the supplied conflict handler callback
|
||||
** to resolve any conflicts encountered while applying the change.
|
||||
*/
|
||||
int sqlite3changeset_apply_str(
|
||||
sqlite3 *db, /* Apply change to "main" db of this handle */
|
||||
int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
|
||||
void *pIn, /* First arg for xInput */
|
||||
int(*xFilter)(
|
||||
void *pCtx, /* Copy of sixth arg to _apply() */
|
||||
const char *zTab /* Table name */
|
||||
),
|
||||
int(*xConflict)(
|
||||
void *pCtx, /* Copy of sixth arg to _apply() */
|
||||
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
|
||||
sqlite3_changeset_iter *p /* Handle describing change and conflict */
|
||||
),
|
||||
void *pCtx /* First argument passed to xConflict */
|
||||
){
|
||||
sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
|
||||
int rc = sqlite3changeset_start_str(&pIter, xInput, pIn);
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
** This function is called to merge two changes to the same row together as
|
||||
** part of an sqlite3changeset_concat() operation. A new change object is
|
||||
|
Reference in New Issue
Block a user