1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-01 06:27:03 +03:00

Add the sqlite3changegroup_xxx() APIs to the sessions module. For combining multiple changesets or patchsets.

FossilOrigin-Name: 0c1a901cd60e557fc676b97625243163dfe9be9d
This commit is contained in:
dan
2015-06-11 17:26:10 +00:00
parent 807547038d
commit 5898ad6954
4 changed files with 318 additions and 125 deletions

View File

@ -4045,6 +4045,15 @@ int sqlite3changeset_apply_strm(
return rc;
}
/*
** sqlite3_changegroup handle.
*/
struct sqlite3_changegroup {
int rc; /* Error code */
int bPatch; /* True to accumulate patchsets */
SessionTable *pList; /* List of tables in current patch */
};
/*
** This function is called to merge two changes to the same row together as
** part of an sqlite3changeset_concat() operation. A new change object is
@ -4170,18 +4179,19 @@ static int sessionChangeMerge(
}
/*
** Add all changes in the changeset passed via the first two arguments to
** hash tables.
** Add all changes in the changeset traversed by the iterator passed as
** the first argument to the changegroup hash tables.
*/
static int sessionChangesetToHash(
sqlite3_changeset_iter *pIter, /* Iterator to read from */
SessionTable **ppTabList /* IN/OUT: List of table objects */
sqlite3_changegroup *pGrp /* Changegroup object to add changeset to */
){
u8 *aRec;
int nRec;
int rc = SQLITE_OK;
SessionTable *pTab = 0;
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
const char *zNew;
int nCol;
@ -4192,6 +4202,13 @@ static int sessionChangesetToHash(
SessionChange *pExist = 0;
SessionChange **pp;
if( pGrp->pList==0 ){
pGrp->bPatch = pIter->bPatchset;
}else if( pIter->bPatchset!=pGrp->bPatch ){
rc = SQLITE_ERROR;
break;
}
sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
/* Search the list for a matching table */
@ -4199,7 +4216,7 @@ static int sessionChangesetToHash(
u8 *abPK;
sqlite3changeset_pk(pIter, &abPK, 0);
for(pTab = *ppTabList; pTab; pTab=pTab->pNext){
for(pTab = pGrp->pList; pTab; pTab=pTab->pNext){
if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break;
}
if( !pTab ){
@ -4209,13 +4226,13 @@ static int sessionChangesetToHash(
break;
}
memset(pTab, 0, sizeof(SessionTable));
pTab->pNext = *ppTabList;
pTab->pNext = pGrp->pList;
pTab->nCol = nCol;
pTab->abPK = (u8*)&pTab[1];
memcpy(pTab->abPK, abPK, nCol);
pTab->zName = (char*)&pTab->abPK[nCol];
memcpy(pTab->zName, zNew, nNew+1);
*ppTabList = pTab;
pGrp->pList = pTab;
}else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){
rc = SQLITE_SCHEMA;
break;
@ -4262,54 +4279,45 @@ static int sessionChangesetToHash(
if( rc==SQLITE_OK ) rc = pIter->rc;
return rc;
}
/*
** 1. Iterate through the left-hand changeset. Add an entry to a table
** specific hash table for each change in the changeset. The hash table
** key is the PK of the row affected by the change.
/*
** Serialize a changeset (or patchset) based on all changesets (or patchsets)
** added to the changegroup object passed as the first argument.
**
** 2. Then interate through the right-hand changeset. Attempt to add an
** entry to a hash table for each component change. If a change already
** exists with the same PK values, combine the two into a single change.
** If xOutput is not NULL, then the changeset/patchset is returned to the
** user via one or more calls to xOutput, as with the other streaming
** interfaces.
**
** 3. Write an output changeset based on the contents of the hash table.
** Or, if xOutput is NULL, then (*ppOut) is populated with a pointer to a
** buffer containing the output changeset before this function returns. In
** this case (*pnOut) is set to the size of the output buffer in bytes. It
** is the responsibility of the caller to free the output buffer using
** sqlite3_free() when it is no longer required.
**
** If successful, SQLITE_OK is returned. Or, if an error occurs, an SQLite
** error code. If an error occurs and xOutput is NULL, (*ppOut) and (*pnOut)
** are both set to 0 before returning.
*/
static int sessionChangesetConcat(
sqlite3_changeset_iter *pLeft,
sqlite3_changeset_iter *pRight,
static int sessionChangegroupOutput(
sqlite3_changegroup *pGrp,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut,
int *pnOut,
void **ppOut
){
SessionTable *pList = 0; /* List of SessionTable objects */
int rc; /* Return code */
int bPatch; /* True for a patchset */
SessionTable *pTab;
int rc = SQLITE_OK;
SessionBuffer buf = {0, 0, 0};
SessionTable *pTab;
assert( xOutput==0 || (ppOut==0 && pnOut==0) );
assert( pLeft->zTab==0 && pRight->zTab==0 );
rc = sessionChangesetToHash(pLeft, &pList);
assert( pLeft->zTab || pList==0 );
if( rc==SQLITE_OK ){
rc = sessionChangesetToHash(pRight, &pList);
}
bPatch = pLeft->bPatchset || pRight->bPatchset;
if( pLeft->zTab && pRight->zTab && pLeft->bPatchset!=pRight->bPatchset ){
rc = SQLITE_ERROR;
}
/* Create the serialized output changeset based on the contents of the
** hash tables attached to the SessionTable objects in list pList.
** hash tables attached to the SessionTable objects in list p->pList.
*/
for(pTab=pList; rc==SQLITE_OK && pTab; pTab=pTab->pNext){
for(pTab=pGrp->pList; rc==SQLITE_OK && pTab; pTab=pTab->pNext){
int i;
if( pTab->nEntry==0 ) continue;
sessionAppendTableHdr(&buf, bPatch, pTab, &rc);
sessionAppendTableHdr(&buf, pGrp->bPatch, pTab, &rc);
for(i=0; i<pTab->nChange; i++){
SessionChange *p;
for(p=pTab->apChange[i]; p; p=p->pNext){
@ -4336,10 +4344,93 @@ static int sessionChangesetConcat(
}
sqlite3_free(buf.aBuf);
sessionDeleteTable(pList);
return rc;
}
/*
** Allocate a new, empty, sqlite3_changegroup.
*/
int sqlite3changegroup_new(sqlite3_changegroup **pp){
int rc = SQLITE_OK; /* Return code */
sqlite3_changegroup *p; /* New object */
p = (sqlite3_changegroup*)sqlite3_malloc(sizeof(sqlite3_changegroup));
if( p==0 ){
rc = SQLITE_NOMEM;
}else{
memset(p, 0, sizeof(sqlite3_changegroup));
}
*pp = p;
return rc;
}
/*
** Add the changeset currently stored in buffer pData, size nData bytes,
** to changeset-group p.
*/
int sqlite3changegroup_add(sqlite3_changegroup *pGrp, int nData, void *pData){
sqlite3_changeset_iter *pIter; /* Iterator opened on pData/nData */
int rc; /* Return code */
rc = sqlite3changeset_start(&pIter, nData, pData);
if( rc==SQLITE_OK ){
rc = sessionChangesetToHash(pIter, pGrp);
}
sqlite3changeset_finalize(pIter);
return rc;
}
/*
** Obtain a buffer containing a changeset representing the concatenation
** of all changesets added to the group so far.
*/
int sqlite3changegroup_output(
sqlite3_changegroup *pGrp,
int *pnData,
void **ppData
){
return sessionChangegroupOutput(pGrp, 0, 0, pnData, ppData);
}
/*
** Streaming versions of changegroup_add().
*/
int sqlite3changegroup_add_strm(
sqlite3_changegroup *pGrp,
int (*xInput)(void *pIn, void *pData, int *pnData),
void *pIn
){
sqlite3_changeset_iter *pIter; /* Iterator opened on pData/nData */
int rc; /* Return code */
rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
if( rc==SQLITE_OK ){
rc = sessionChangesetToHash(pIter, pGrp);
}
sqlite3changeset_finalize(pIter);
return rc;
}
/*
** Streaming versions of changegroup_output().
*/
int sqlite3changegroup_output_strm(
sqlite3_changegroup *pGrp,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut
){
return sessionChangegroupOutput(pGrp, xOutput, pOut, 0, 0);
}
/*
** Delete a changegroup object.
*/
void sqlite3changegroup_delete(sqlite3_changegroup *pGrp){
if( pGrp ){
sessionDeleteTable(pGrp->pList);
sqlite3_free(pGrp);
}
}
/*
** Combine two changesets together.
*/
@ -4351,22 +4442,21 @@ int sqlite3changeset_concat(
int *pnOut, /* OUT: Number of bytes in output changeset */
void **ppOut /* OUT: changeset (left <concat> right) */
){
sqlite3_changeset_iter *pIter1 = 0;
sqlite3_changeset_iter *pIter2 = 0;
sqlite3_changegroup *pGrp;
int rc;
*pnOut = 0;
*ppOut = 0;
rc = sqlite3changeset_start(&pIter1, nLeft, pLeft);
rc = sqlite3changegroup_new(&pGrp);
if( rc==SQLITE_OK ){
rc = sqlite3changeset_start(&pIter2, nRight, pRight);
rc = sqlite3changegroup_add(pGrp, nLeft, pLeft);
}
if( rc==SQLITE_OK ){
rc = sessionChangesetConcat(pIter1, pIter2, 0, 0, pnOut, ppOut);
rc = sqlite3changegroup_add(pGrp, nRight, pRight);
}
if( rc==SQLITE_OK ){
rc = sqlite3changegroup_output(pGrp, pnOut, ppOut);
}
sqlite3changegroup_delete(pGrp);
sqlite3changeset_finalize(pIter1);
sqlite3changeset_finalize(pIter2);
return rc;
}
@ -4381,20 +4471,21 @@ int sqlite3changeset_concat_strm(
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut
){
sqlite3_changeset_iter *pIter1 = 0;
sqlite3_changeset_iter *pIter2 = 0;
sqlite3_changegroup *pGrp;
int rc;
rc = sqlite3changeset_start_strm(&pIter1, xInputA, pInA);
rc = sqlite3changegroup_new(&pGrp);
if( rc==SQLITE_OK ){
rc = sqlite3changeset_start_strm(&pIter2, xInputB, pInB);
rc = sqlite3changegroup_add_strm(pGrp, xInputA, pInA);
}
if( rc==SQLITE_OK ){
rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0);
rc = sqlite3changegroup_add_strm(pGrp, xInputB, pInB);
}
if( rc==SQLITE_OK ){
rc = sqlite3changegroup_output_strm(pGrp, xOutput, pOut);
}
sqlite3changegroup_delete(pGrp);
sqlite3changeset_finalize(pIter1);
sqlite3changeset_finalize(pIter2);
return rc;
}