1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-24 22:22:08 +03:00

Add largely untested APIs for rebasing changesets.

FossilOrigin-Name: 39915b683b3f8d3bf872af1dede96bf2818b488a8638a1d248395023fc4bd0ef
This commit is contained in:
dan
2018-03-14 21:06:58 +00:00
parent a38e6c57bc
commit c0a499eaad
5 changed files with 314 additions and 22 deletions

View File

@ -2918,7 +2918,8 @@ static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){
static int sessionChangesetNext(
sqlite3_changeset_iter *p, /* Changeset iterator */
u8 **paRec, /* If non-NULL, store record pointer here */
int *pnRec /* If non-NULL, store size of record here */
int *pnRec, /* If non-NULL, store size of record here */
int *pbNew /* If non-NULL, true if new table */
){
int i;
u8 op;
@ -2953,6 +2954,7 @@ static int sessionChangesetNext(
op = p->in.aData[p->in.iNext++];
while( op=='T' || op=='P' ){
if( pbNew ) *pbNew = 1;
p->bPatchset = (op=='P');
if( sessionChangesetReadTblhdr(p) ) return p->rc;
if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
@ -3031,7 +3033,7 @@ static int sessionChangesetNext(
** callback by changeset_apply().
*/
int sqlite3changeset_next(sqlite3_changeset_iter *p){
return sessionChangesetNext(p, 0, 0);
return sessionChangesetNext(p, 0, 0, 0);
}
/*
@ -4522,6 +4524,7 @@ struct sqlite3_changegroup {
*/
static int sessionChangeMerge(
SessionTable *pTab, /* Table structure */
int bRebase, /* True for a rebase hash-table */
int bPatchset, /* True for patchsets */
SessionChange *pExist, /* Existing change */
int op2, /* Second change operation */
@ -4543,6 +4546,8 @@ static int sessionChangeMerge(
pNew->nRecord = nRec;
pNew->aRecord = (u8*)&pNew[1];
memcpy(pNew->aRecord, aRec, nRec);
}else if( bRebase){
assert( 0 );
}else{
int op1 = pExist->op;
@ -4645,7 +4650,8 @@ static int sessionChangeMerge(
*/
static int sessionChangesetToHash(
sqlite3_changeset_iter *pIter, /* Iterator to read from */
sqlite3_changegroup *pGrp /* Changegroup object to add changeset to */
sqlite3_changegroup *pGrp, /* Changegroup object to add changeset to */
int bRebase /* True if hash table is for rebasing */
){
u8 *aRec;
int nRec;
@ -4653,7 +4659,7 @@ static int sessionChangesetToHash(
SessionTable *pTab = 0;
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, 0) ){
const char *zNew;
int nCol;
int op;
@ -4733,7 +4739,7 @@ static int sessionChangesetToHash(
}
}
rc = sessionChangeMerge(pTab,
rc = sessionChangeMerge(pTab, bRebase,
pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
);
if( rc ) break;
@ -4841,7 +4847,7 @@ int sqlite3changegroup_add(sqlite3_changegroup *pGrp, int nData, void *pData){
rc = sqlite3changeset_start(&pIter, nData, pData);
if( rc==SQLITE_OK ){
rc = sessionChangesetToHash(pIter, pGrp);
rc = sessionChangesetToHash(pIter, pGrp, 0);
}
sqlite3changeset_finalize(pIter);
return rc;
@ -4872,7 +4878,7 @@ int sqlite3changegroup_add_strm(
rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
if( rc==SQLITE_OK ){
rc = sessionChangesetToHash(pIter, pGrp);
rc = sessionChangesetToHash(pIter, pGrp, 0);
}
sqlite3changeset_finalize(pIter);
return rc;
@ -4957,4 +4963,256 @@ int sqlite3changeset_concat_strm(
return rc;
}
struct sqlite3_rebaser {
sqlite3_changegroup grp; /* Hash table */
};
/*
** Buffers a1 and a2 must both contain a sessions module record nCol
** fields in size. This function appends an nCol sessions module
** record to buffer pBuf that is a copy of a1, except that:
**
** + If bUndefined is 0, for each field that is not "undefined" in either
** a1[] or a2[], swap in the field from a2[].
**
** + If bUndefined is 1, for each field that is "undefined" in a1[]
** swap in the field from a2[].
*/
static void sessionAppendRecordMerge(
SessionBuffer *pBuf,
int nCol,
int bUndefined,
u8 *a1, int n1,
u8 *a2, int n2,
int *pRc
){
sessionBufferGrow(pBuf, n1+n2, pRc);
if( *pRc==SQLITE_OK ){
int i;
u8 *pOut = &pBuf->aBuf[pBuf->nBuf];
for(i=0; i<nCol; i++){
int nn1 = sessionSerialLen(a1);
int nn2 = sessionSerialLen(a2);
if( bUndefined==0 ){
if( *a1 && *a2 ){
memcpy(pOut, a2, nn2);
pOut += nn2;
}else{
memcpy(pOut, a1, nn1);
pOut += nn1;
}
}else{
if( *a1==0 ){
memcpy(pOut, a2, nn2);
pOut += nn2;
}else{
memcpy(pOut, a1, nn1);
pOut += nn1;
}
}
a1 += n1;
a2 += n2;
}
}
}
static int sessionRebase(
sqlite3_rebaser *p, /* Rebaser hash table */
sqlite3_changeset_iter *pIter, /* Input data */
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut, /* Context for xOutput callback */
int *pnOut, /* OUT: Number of bytes in output changeset */
void **ppOut /* OUT: Inverse of pChangeset */
){
int rc = SQLITE_OK;
u8 *aRec = 0;
int nRec = 0;
int bNew = 0;
SessionTable *pTab = 0;
SessionBuffer sOut = {0,0,0};
while( SQLITE_OK==sessionChangesetNext(pIter, &aRec, &nRec, &bNew) ){
SessionChange *pChange = 0;
if( bNew ){
const char *zTab = pIter->zTab;
for(pTab=p->grp.pList; pTab; pTab=pTab->pNext){
if( 0==sqlite3_stricmp(pTab->zName, zTab) ) break;
}
bNew = 0;
/* Append a table header to the output for this new table */
sessionAppendByte(&sOut, pIter->bPatchset ? 'P' : 'T', &rc);
sessionAppendVarint(&sOut, pIter->nCol, &rc);
sessionAppendBlob(&sOut, pIter->abPK, pIter->nCol, &rc);
sessionAppendBlob(&sOut, (u8*)pIter->zTab, strlen(pIter->zTab)+1, &rc);
}
if( pTab ){
int bPkOnly = (pIter->bPatchset && pIter->op==SQLITE_DELETE);
int iHash = sessionChangeHash(pTab, bPkOnly, aRec, pTab->nChange);
for(pChange=pTab->apChange[iHash]; pChange; pChange=pChange->pNext){
if( sessionChangeEqual(pTab, bPkOnly, aRec, 0, pChange->aRecord) ){
break;
}
}
}
if( pChange ){
assert( pChange->op==SQLITE_DELETE || pChange->op==SQLITE_INSERT );
/* If pChange is an INSERT, then rebase the change. If it is a
** DELETE, omit the change from the output altogether. */
if( pChange->op==SQLITE_INSERT ){
if( pChange->bIndirect ){
/* The change being rebased against was a DELETE. So, if the
** input is a:
**
** DELETE - omit the change altogether.
** UPDATE - change to an INSERT,
** INSERT - no change (output the record as is).
*/
if( pIter->op!=SQLITE_DELETE ){
sessionAppendByte(&sOut, SQLITE_INSERT, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
if( pIter->op==SQLITE_INSERT ){
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}else{
sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
);
}
}
}else{
sessionAppendByte(&sOut, pIter->op, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
if( pIter->op==SQLITE_INSERT ){
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}else{
u8 *pCsr = aRec;
sessionAppendRecordMerge(&sOut, pIter->nCol, 0,
aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
);
if( pIter->op==SQLITE_UPDATE ){
sessionSkipRecord(&pCsr, pIter->nCol);
sessionAppendBlob(&sOut, pCsr, nRec - (pCsr-aRec), &rc);
}
}
}
}
}else{
sessionAppendByte(&sOut, pIter->op, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}
if( rc==SQLITE_OK && xOutput && sOut.nBuf>SESSIONS_STRM_CHUNK_SIZE ){
rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
sOut.nBuf = 0;
}
if( rc ) break;
}
if( rc!=SQLITE_OK ){
sqlite3_free(sOut.aBuf);
memset(&sOut, 0, sizeof(sOut));
}
if( rc==SQLITE_OK ){
if( xOutput ){
if( sOut.nBuf>0 ){
rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
}
}else{
*ppOut = (void*)sOut.aBuf;
*pnOut = sOut.nBuf;
sOut.aBuf = 0;
}
}
sqlite3_free(sOut.aBuf);
return rc;
}
/*
** Create a new rebaser object.
*/
int sqlite3rebaser_create(sqlite3_rebaser **ppNew){
int rc = SQLITE_OK;
sqlite3_rebaser *pNew;
pNew = sqlite3_malloc(sizeof(sqlite3_rebaser));
if( pNew==0 ){
rc = SQLITE_NOMEM;
}
*ppNew = pNew;
return rc;
}
/*
** Call this one or more times to configure a rebaser.
*/
int sqlite3rebaser_configure(
sqlite3_rebaser *p,
int nRebase, const void *pRebase
){
sqlite3_changeset_iter *pIter = 0; /* Iterator opened on pData/nData */
int rc; /* Return code */
rc = sqlite3changeset_start(&pIter, nRebase, (void*)pRebase);
if( rc==SQLITE_OK ){
rc = sessionChangesetToHash(pIter, &p->grp, 1);
}
sqlite3changeset_finalize(pIter);
return rc;
}
/*
** Rebase a changeset according to current rebaser configuration
*/
int sqlite3rebaser_rebase(
sqlite3_rebaser *p,
int nIn, const void *pIn,
int *pnOut, void **ppOut
){
sqlite3_changeset_iter *pIter = 0; /* Iterator to skip through input */
int rc = sqlite3changeset_start(&pIter, nIn, (void*)pIn);
if( rc==SQLITE_OK ){
rc = sessionRebase(p, pIter, 0, 0, pnOut, ppOut);
sqlite3changeset_finalize(pIter);
}
return rc;
}
/*
** Rebase a changeset according to current rebaser configuration
*/
int sqlite3rebaser_rebase_strm(
sqlite3_rebaser *p,
int (*xInput)(void *pIn, void *pData, int *pnData),
void *pIn,
int (*xOutput)(void *pOut, const void *pData, int nData),
void *pOut
){
sqlite3_changeset_iter *pIter = 0; /* Iterator to skip through input */
int rc = sqlite3changeset_start_strm(&pIter, xInput, pIn);
if( rc==SQLITE_OK ){
rc = sessionRebase(p, pIter, xOutput, pOut, 0, 0);
sqlite3changeset_finalize(pIter);
}
return rc;
}
/*
** Destroy a rebaser object
*/
void sqlite3rebaser_destroy(sqlite3_rebaser *p){
if( p ){
sessionDeleteTable(p->grp.pList);
sqlite3_free(p);
}
}
#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */