1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-30 19:03:16 +03:00

Add new sessions API sqlite3changegroup_add_change().

FossilOrigin-Name: 73647db1ddfdaf40cbf18f1e47c10b4a906489f6d34d7667f0f2ff532f1eb37c
This commit is contained in:
dan
2024-05-04 21:10:24 +00:00
parent c52c00f670
commit 5b80dbe6b3
5 changed files with 253 additions and 106 deletions

View File

@ -3685,14 +3685,14 @@ static int sessionChangesetNextOne(
p->rc = sessionInputBuffer(&p->in, 2);
if( p->rc!=SQLITE_OK ) return p->rc;
sessionDiscardData(&p->in);
p->in.iCurrent = p->in.iNext;
/* If the iterator is already at the end of the changeset, return DONE. */
if( p->in.iNext>=p->in.nData ){
return SQLITE_DONE;
}
sessionDiscardData(&p->in);
p->in.iCurrent = p->in.iNext;
op = p->in.aData[p->in.iNext++];
while( op=='T' || op=='P' ){
if( pbNew ) *pbNew = 1;
@ -5427,6 +5427,7 @@ struct sqlite3_changegroup {
int rc; /* Error code */
int bPatch; /* True to accumulate patchsets */
SessionTable *pList; /* List of tables in current patch */
SessionBuffer rec;
sqlite3 *db; /* Configured by changegroup_schema() */
char *zDb; /* Configured by changegroup_schema() */
@ -5725,108 +5726,128 @@ static int sessionChangesetExtendRecord(
}
/*
** Add all changes in the changeset traversed by the iterator passed as
** the first argument to the changegroup hash tables.
** Locate or create a SessionTable object that may be used to add the
** change currently pointed to by iterator pIter to changegroup pGrp.
** If successful, set output variable (*ppTab) to point to the table
** object and return SQLITE_OK. Otherwise, if some error occurs, return
** an SQLite error code and leave (*ppTab) set to NULL.
*/
static int sessionChangesetToHash(
sqlite3_changeset_iter *pIter, /* Iterator to read from */
sqlite3_changegroup *pGrp, /* Changegroup object to add changeset to */
int bRebase /* True if hash table is for rebasing */
static int sessionChangesetFindTable(
sqlite3_changegroup *pGrp,
const char *zTab,
sqlite3_changeset_iter *pIter,
SessionTable **ppTab
){
u8 *aRec;
int nRec;
int rc = SQLITE_OK;
SessionTable *pTab = 0;
SessionBuffer rec = {0, 0, 0};
int nTab = (int)strlen(zTab);
u8 *abPK = 0;
int nCol = 0;
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, 0) ){
const char *zNew;
int nCol;
int op;
int iHash;
int bIndirect;
SessionChange *pChange;
SessionChange *pExist = 0;
SessionChange **pp;
*ppTab = 0;
sqlite3changeset_pk(pIter, &abPK, &nCol);
/* Ensure that only changesets, or only patchsets, but not a mixture
** of both, are being combined. It is an error to try to combine a
** changeset and a patchset. */
if( pGrp->pList==0 ){
pGrp->bPatch = pIter->bPatchset;
}else if( pIter->bPatchset!=pGrp->bPatch ){
rc = SQLITE_ERROR;
break;
/* Search the list for an existing table */
for(pTab = pGrp->pList; pTab; pTab=pTab->pNext){
if( 0==sqlite3_strnicmp(pTab->zName, zTab, nTab+1) ) break;
}
/* If one was not found above, create a new table now */
if( !pTab ){
SessionTable **ppNew;
pTab = sqlite3_malloc64(sizeof(SessionTable) + nCol + nTab+1);
if( !pTab ){
return SQLITE_NOMEM;
}
memset(pTab, 0, sizeof(SessionTable));
pTab->nCol = nCol;
pTab->abPK = (u8*)&pTab[1];
memcpy(pTab->abPK, abPK, nCol);
pTab->zName = (char*)&pTab->abPK[nCol];
memcpy(pTab->zName, zTab, nTab+1);
sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
/* Search the list for a matching table */
int nNew = (int)strlen(zNew);
u8 *abPK;
sqlite3changeset_pk(pIter, &abPK, 0);
for(pTab = pGrp->pList; pTab; pTab=pTab->pNext){
if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break;
}
if( !pTab ){
SessionTable **ppTab;
pTab = sqlite3_malloc64(sizeof(SessionTable) + nCol + nNew+1);
if( !pTab ){
rc = SQLITE_NOMEM;
break;
}
memset(pTab, 0, sizeof(SessionTable));
pTab->nCol = nCol;
pTab->abPK = (u8*)&pTab[1];
memcpy(pTab->abPK, abPK, nCol);
pTab->zName = (char*)&pTab->abPK[nCol];
memcpy(pTab->zName, zNew, nNew+1);
if( pGrp->db ){
pTab->nCol = 0;
rc = sessionInitTable(0, pTab, pGrp->db, pGrp->zDb);
if( rc ){
assert( pTab->azCol==0 );
sqlite3_free(pTab);
break;
}
}
/* The new object must be linked on to the end of the list, not
** simply added to the start of it. This is to ensure that the
** tables within the output of sqlite3changegroup_output() are in
** the right order. */
for(ppTab=&pGrp->pList; *ppTab; ppTab=&(*ppTab)->pNext);
*ppTab = pTab;
}
if( !sessionChangesetCheckCompat(pTab, nCol, abPK) ){
rc = SQLITE_SCHEMA;
break;
if( pGrp->db ){
pTab->nCol = 0;
rc = sessionInitTable(0, pTab, pGrp->db, pGrp->zDb);
if( rc ){
assert( pTab->azCol==0 );
sqlite3_free(pTab);
return rc;
}
}
if( nCol<pTab->nCol ){
assert( pGrp->db );
rc = sessionChangesetExtendRecord(pGrp, pTab, nCol, op, aRec, nRec, &rec);
if( rc ) break;
aRec = rec.aBuf;
nRec = rec.nBuf;
}
/* The new object must be linked on to the end of the list, not
** simply added to the start of it. This is to ensure that the
** tables within the output of sqlite3changegroup_output() are in
** the right order. */
for(ppNew=&pGrp->pList; *ppNew; ppNew=&(*ppNew)->pNext);
*ppNew = pTab;
}
if( sessionGrowHash(0, pIter->bPatchset, pTab) ){
rc = SQLITE_NOMEM;
break;
}
/* Check that the table is compatible. */
if( !sessionChangesetCheckCompat(pTab, nCol, abPK) ){
rc = SQLITE_SCHEMA;
}
*ppTab = pTab;
return rc;
}
/*
** Add the change currently indicated by iterator pIter to the hash table
** belonging to changegroup pGrp.
*/
static int sessionOneChangeToHash(
sqlite3_changegroup *pGrp,
sqlite3_changeset_iter *pIter,
int bRebase
){
int rc = SQLITE_OK;
int nCol = 0;
int op = 0;
int iHash = 0;
int bIndirect = 0;
SessionChange *pChange = 0;
SessionChange *pExist = 0;
SessionChange **pp = 0;
SessionTable *pTab = 0;
u8 *aRec = &pIter->in.aData[pIter->in.iCurrent + 2];
int nRec = (pIter->in.iNext - pIter->in.iCurrent) - 2;
/* Ensure that only changesets, or only patchsets, but not a mixture
** of both, are being combined. It is an error to try to combine a
** changeset and a patchset. */
if( pGrp->pList==0 ){
pGrp->bPatch = pIter->bPatchset;
}else if( pIter->bPatchset!=pGrp->bPatch ){
rc = SQLITE_ERROR;
}
if( rc==SQLITE_OK ){
const char *zTab = 0;
sqlite3changeset_op(pIter, &zTab, &nCol, &op, &bIndirect);
rc = sessionChangesetFindTable(pGrp, zTab, pIter, &pTab);
}
if( rc==SQLITE_OK && nCol<pTab->nCol ){
SessionBuffer *pBuf = &pGrp->rec;
rc = sessionChangesetExtendRecord(pGrp, pTab, nCol, op, aRec, nRec, pBuf);
aRec = pBuf->aBuf;
nRec = pBuf->nBuf;
assert( pGrp->db );
}
if( rc==SQLITE_OK && sessionGrowHash(0, pIter->bPatchset, pTab) ){
rc = SQLITE_NOMEM;
}
if( rc==SQLITE_OK ){
/* Search for existing entry. If found, remove it from the hash table.
** Code below may link it back in. */
iHash = sessionChangeHash(
pTab, (pIter->bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
);
/* Search for existing entry. If found, remove it from the hash table.
** Code below may link it back in.
*/
for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
int bPkOnly1 = 0;
int bPkOnly2 = 0;
@ -5841,19 +5862,41 @@ static int sessionChangesetToHash(
break;
}
}
}
if( rc==SQLITE_OK ){
rc = sessionChangeMerge(pTab, bRebase,
pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
);
if( rc ) break;
if( pChange ){
pChange->pNext = pTab->apChange[iHash];
pTab->apChange[iHash] = pChange;
pTab->nEntry++;
}
}
if( rc==SQLITE_OK && pChange ){
pChange->pNext = pTab->apChange[iHash];
pTab->apChange[iHash] = pChange;
pTab->nEntry++;
}
if( rc==SQLITE_OK ) rc = pIter->rc;
return rc;
}
/*
** Add all changes in the changeset traversed by the iterator passed as
** the first argument to the changegroup hash tables.
*/
static int sessionChangesetToHash(
sqlite3_changeset_iter *pIter, /* Iterator to read from */
sqlite3_changegroup *pGrp, /* Changegroup object to add changeset to */
int bRebase /* True if hash table is for rebasing */
){
u8 *aRec;
int nRec;
int rc = SQLITE_OK;
while( SQLITE_ROW==(sessionChangesetNext(pIter, &aRec, &nRec, 0)) ){
rc = sessionOneChangeToHash(pGrp, pIter, bRebase);
if( rc!=SQLITE_OK ) break;
}
sqlite3_free(rec.aBuf);
if( rc==SQLITE_OK ) rc = pIter->rc;
return rc;
}
@ -5981,6 +6024,20 @@ int sqlite3changegroup_add(sqlite3_changegroup *pGrp, int nData, void *pData){
return rc;
}
/*
** Add a single change to a changeset-group.
*/
int sqlite3changegroup_add_change(
sqlite3_changegroup *pGrp,
sqlite3_changeset_iter *pIter
){
if( pIter->in.iCurrent==pIter->in.iNext || pIter->rc!=SQLITE_OK ){
/* Iterator does not point to any valid entry. */
return SQLITE_ERROR;
}
return sessionChangesetToHash(pIter, pGrp, 0);
}
/*
** Obtain a buffer containing a changeset representing the concatenation
** of all changesets added to the group so far.
@ -6030,6 +6087,7 @@ void sqlite3changegroup_delete(sqlite3_changegroup *pGrp){
if( pGrp ){
sqlite3_free(pGrp->zDb);
sessionDeleteTable(0, pGrp->pList);
sqlite3_free(pGrp->rec.aBuf);
sqlite3_free(pGrp);
}
}
@ -6431,6 +6489,7 @@ int sqlite3rebaser_rebase_strm(
void sqlite3rebaser_delete(sqlite3_rebaser *p){
if( p ){
sessionDeleteTable(0, p->grp.pList);
sqlite3_free(p->grp.rec.aBuf);
sqlite3_free(p);
}
}