1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-27 20:41:58 +03:00

Add streaming version of sqlite3changeset_concat().

FossilOrigin-Name: 88eb6656bdb047a104837a2e15e7fe18c0a7a159
This commit is contained in:
dan
2014-09-25 20:43:28 +00:00
parent fa122adac1
commit cbf6d2d2aa
6 changed files with 203 additions and 99 deletions

View File

@ -2386,7 +2386,6 @@ static int sessionChangesetNext(
int i;
u8 op;
assert( paRec==0 || p->in.xInput==0 ); /* fixme! */
assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );
/* If the iterator is in the error-state, return immediately. */
@ -2426,36 +2425,48 @@ static int sessionChangesetNext(
return (p->rc = SQLITE_CORRUPT_BKPT);
}
if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; }
/* If this is an UPDATE or DELETE, read the old.* record. */
if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
u8 *abPK = p->bPatchset ? p->abPK : 0;
p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue);
if( paRec ){
int nVal; /* Number of values to buffer */
if( p->bPatchset==0 && op==SQLITE_UPDATE ){
nVal = p->nCol * 2;
}else if( p->bPatchset && op==SQLITE_DELETE ){
nVal = 0;
for(i=0; i<p->nCol; i++) if( p->abPK[i] ) nVal++;
}else{
nVal = p->nCol;
}
p->rc = sessionChangesetBufferRecord(&p->in, nVal, pnRec);
if( p->rc!=SQLITE_OK ) return p->rc;
}
*paRec = &p->in.aData[p->in.iNext];
p->in.iNext += *pnRec;
}else{
/* If this is an INSERT or UPDATE, read the new.* record. */
if( p->op!=SQLITE_DELETE ){
sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut);
if( p->rc!=SQLITE_OK ) return p->rc;
}
/* If this is an UPDATE or DELETE, read the old.* record. */
if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
u8 *abPK = p->bPatchset ? p->abPK : 0;
p->rc = sessionReadRecord(&p->in, p->nCol, abPK, p->apValue);
if( p->rc!=SQLITE_OK ) return p->rc;
}
if( pnRec ){
*pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec);
}else if( p->bPatchset && p->op==SQLITE_UPDATE ){
/* If this is an UPDATE that is part of a patchset, then all PK and
** modified fields are present in the new.* record. The old.* record
** is currently completely empty. This block shifts the PK fields from
** new.* to old.*, to accommodate the code that reads these arrays. */
int i;
for(i=0; i<p->nCol; i++){
assert( p->apValue[i]==0 );
assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
if( p->abPK[i] ){
p->apValue[i] = p->apValue[i+p->nCol];
p->apValue[i+p->nCol] = 0;
/* If this is an INSERT or UPDATE, read the new.* record. */
if( p->op!=SQLITE_DELETE ){
p->rc = sessionReadRecord(&p->in, p->nCol, 0, &p->apValue[p->nCol]);
if( p->rc!=SQLITE_OK ) return p->rc;
}
if( p->bPatchset && p->op==SQLITE_UPDATE ){
/* If this is an UPDATE that is part of a patchset, then all PK and
** modified fields are present in the new.* record. The old.* record
** is currently completely empty. This block shifts the PK fields from
** new.* to old.*, to accommodate the code that reads these arrays. */
int i;
for(i=0; i<p->nCol; i++){
assert( p->apValue[i]==0 );
assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
if( p->abPK[i] ){
p->apValue[i] = p->apValue[i+p->nCol];
p->apValue[i+p->nCol] = 0;
}
}
}
}
@ -2627,14 +2638,17 @@ int sqlite3changeset_fk_conflicts(
** callback by changeset_apply().
*/
int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
int i; /* Used to iterate through p->apValue[] */
int rc = p->rc; /* Return code */
if( p->apValue ){
for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
int rc = SQLITE_OK;
if( p ){
int i; /* Used to iterate through p->apValue[] */
rc = p->rc;
if( p->apValue ){
for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
}
sqlite3_free(p->tblhdr.aBuf);
sqlite3_free(p->in.buf.aBuf);
sqlite3_free(p);
}
sqlite3_free(p->tblhdr.aBuf);
sqlite3_free(p->in.buf.aBuf);
sqlite3_free(p);
return rc;
}
@ -3647,7 +3661,7 @@ static int sessionChangeMerge(
SessionChange *pNew = 0;
if( !pExist ){
pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange));
pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec);
if( !pNew ){
return SQLITE_NOMEM;
}
@ -3655,7 +3669,8 @@ static int sessionChangeMerge(
pNew->op = op2;
pNew->bIndirect = bIndirect;
pNew->nRecord = nRec;
pNew->aRecord = aRec;
pNew->aRecord = (u8*)&pNew[1];
memcpy(pNew->aRecord, aRec, nRec);
}else{
int op1 = pExist->op;
@ -3751,21 +3766,15 @@ static int sessionChangeMerge(
** Add all changes in the changeset passed via the first two arguments to
** hash tables.
*/
static int sessionConcatChangeset(
int bPatchset, /* True to expect patchsets */
int nChangeset, /* Number of bytes in pChangeset */
void *pChangeset, /* Changeset buffer */
static int sessionAddChangeset(
sqlite3_changeset_iter *pIter, /* Iterator to read from */
SessionTable **ppTabList /* IN/OUT: List of table objects */
){
u8 *aRec;
int nRec;
sqlite3_changeset_iter *pIter;
int rc;
int rc = SQLITE_OK;
SessionTable *pTab = 0;
rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
if( rc!=SQLITE_OK ) return rc;
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
const char *zNew;
int nCol;
@ -3776,12 +3785,14 @@ static int sessionConcatChangeset(
SessionChange *pExist = 0;
SessionChange **pp;
#if 0
assert( bPatchset==0 || bPatchset==1 );
assert( pIter->bPatchset==0 || pIter->bPatchset==1 );
if( pIter->bPatchset!=bPatchset ){
rc = SQLITE_ERROR;
break;
}
#endif
sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
@ -3813,12 +3824,12 @@ static int sessionConcatChangeset(
}
}
if( sessionGrowHash(bPatchset, pTab) ){
if( sessionGrowHash(pIter->bPatchset, pTab) ){
rc = SQLITE_NOMEM;
break;
}
iHash = sessionChangeHash(
pTab, (bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
pTab, (pIter->bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
);
/* Search for existing entry. If found, remove it from the hash table.
@ -3827,7 +3838,7 @@ static int sessionConcatChangeset(
for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
int bPkOnly1 = 0;
int bPkOnly2 = 0;
if( bPatchset ){
if( pIter->bPatchset ){
bPkOnly1 = (*pp)->op==SQLITE_DELETE;
bPkOnly2 = op==SQLITE_DELETE;
}
@ -3840,7 +3851,7 @@ static int sessionConcatChangeset(
}
rc = sessionChangeMerge(pTab,
bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
);
if( rc ) break;
if( pChange ){
@ -3850,15 +3861,10 @@ static int sessionConcatChangeset(
}
}
if( rc==SQLITE_OK ){
rc = sqlite3changeset_finalize(pIter);
}else{
sqlite3changeset_finalize(pIter);
}
if( rc==SQLITE_OK ) rc = pIter->rc;
return rc;
}
/*
** 1. Iterate through the left-hand changeset. Add an entry to a table
** specific hash table for each change in the changeset. The hash table
@ -3870,26 +3876,25 @@ static int sessionConcatChangeset(
**
** 3. Write an output changeset based on the contents of the hash table.
*/
int sqlite3changeset_concat(
int nLeft, /* Number of bytes in lhs input */
void *pLeft, /* Lhs input changeset */
int nRight /* Number of bytes in rhs input */,
void *pRight, /* Rhs input changeset */
int *pnOut, /* OUT: Number of bytes in output changeset */
void **ppOut /* OUT: changeset (left <concat> right) */
int sessionChangesetConcat(
sqlite3_changeset_iter *pLeft,
sqlite3_changeset_iter *pRight,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut,
int *pnOut,
void **ppOut
){
SessionTable *pList = 0; /* List of SessionTable objects */
int rc; /* Return code */
int bPatch; /* True for a patchset */
*pnOut = 0;
*ppOut = 0;
bPatch = (nLeft>0 && *(char*)pLeft=='P') || (nRight>0 && *(char*)pRight=='P');
assert( xOutput==0 || (ppOut==0 && pnOut==0) );
rc = sessionConcatChangeset(bPatch, nLeft, pLeft, &pList);
rc = sessionAddChangeset(pLeft, &pList);
if( rc==SQLITE_OK ){
rc = sessionConcatChangeset(bPatch, nRight, pRight, &pList);
rc = sessionAddChangeset(pRight, &pList);
}
bPatch = pLeft->bPatchset || pRight->bPatchset;
/* Create the serialized output changeset based on the contents of the
** hash tables attached to the SessionTable objects in list pList.
@ -3897,7 +3902,7 @@ int sqlite3changeset_concat(
if( rc==SQLITE_OK ){
SessionTable *pTab;
SessionBuffer buf = {0, 0, 0};
for(pTab=pList; pTab; pTab=pTab->pNext){
for(pTab=pList; pTab && rc==SQLITE_OK; pTab=pTab->pNext){
int i;
if( pTab->nEntry==0 ) continue;
@ -3910,18 +3915,85 @@ int sqlite3changeset_concat(
sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
}
}
if( rc==SQLITE_OK && xOutput && buf.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
rc = xOutput(pOut, buf.aBuf, buf.nBuf);
buf.nBuf = 0;
}
}
if( rc==SQLITE_OK ){
*ppOut = buf.aBuf;
*pnOut = buf.nBuf;
}else{
sqlite3_free(buf.aBuf);
if( xOutput ){
if( buf.nBuf>0 ) rc = xOutput(pOut, buf.aBuf, buf.nBuf);
}else{
*ppOut = buf.aBuf;
*pnOut = buf.nBuf;
buf.aBuf = 0;
}
}
sqlite3_free(buf.aBuf);
}
sessionDeleteTable(pList);
return rc;
}
/*
** Combine two changesets together.
*/
int sqlite3changeset_concat(
int nLeft, /* Number of bytes in lhs input */
void *pLeft, /* Lhs input changeset */
int nRight /* Number of bytes in rhs input */,
void *pRight, /* Rhs input changeset */
int *pnOut, /* OUT: Number of bytes in output changeset */
void **ppOut /* OUT: changeset (left <concat> right) */
){
sqlite3_changeset_iter *pIter1 = 0;
sqlite3_changeset_iter *pIter2 = 0;
int rc;
*pnOut = 0;
*ppOut = 0;
rc = sqlite3changeset_start(&pIter1, nLeft, pLeft);
if( rc==SQLITE_OK ){
rc = sqlite3changeset_start(&pIter2, nRight, pRight);
}
if( rc==SQLITE_OK ){
rc = sessionChangesetConcat(pIter1, pIter2, 0, 0, pnOut, ppOut);
}
sqlite3changeset_finalize(pIter1);
sqlite3changeset_finalize(pIter2);
return rc;
}
/*
** Streaming version of sqlite3changeset_concat().
*/
int sqlite3changeset_concat_str(
int (*xInputA)(void *pIn, void *pData, int *pnData),
void *pInA,
int (*xInputB)(void *pIn, void *pData, int *pnData),
void *pInB,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut
){
sqlite3_changeset_iter *pIter1 = 0;
sqlite3_changeset_iter *pIter2 = 0;
int rc;
rc = sqlite3changeset_start_str(&pIter1, xInputA, pInA);
if( rc==SQLITE_OK ){
rc = sqlite3changeset_start_str(&pIter2, xInputB, pInB);
}
if( rc==SQLITE_OK ){
rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0);
}
sqlite3changeset_finalize(pIter1);
sqlite3changeset_finalize(pIter2);
return rc;
}
#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */