1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Start adding the sqlite3changeset_concat() function to the session module.

FossilOrigin-Name: 8927b2260b8d84f53776cb29e1d2fa41b6b0de0e
This commit is contained in:
dan
2011-04-14 11:16:21 +00:00
parent 9e3fbc0157
commit 5d607a6e06
6 changed files with 660 additions and 66 deletions

View File

@ -126,6 +126,7 @@ struct SessionTable {
** Followed by one or more changes to the table.
**
** 1 byte: Either SQLITE_INSERT, UPDATE or DELETE.
** 1 byte: The "indirect-change" flag.
** old.* record: (delete and update only)
** new.* record: (insert and update only)
*/
@ -353,20 +354,19 @@ static unsigned int sessionPreupdateHash(
}
/*
** Based on the primary key values stored in change pChange, calculate a
** Based on the primary key values stored in change aRecord, calculate a
** hash key, assuming the has table has nBucket buckets. The hash keys
** calculated by this function are compatible with those calculated by
** sessionPreupdateHash().
*/
static unsigned int sessionChangeHash(
sqlite3 *db, /* Database handle */
SessionTable *pTab, /* Table handle */
SessionChange *pChange, /* Change handle */
u8 *aRecord, /* Change record */
int nBucket /* Assume this many buckets in hash table */
){
unsigned int h = 0; /* Value to return */
int i; /* Used to iterate through columns */
u8 *a = pChange->aRecord; /* Used to iterate through change record */
u8 *a = aRecord; /* Used to iterate through change record */
for(i=0; i<pTab->nCol; i++){
int eType = *a++;
@ -393,6 +393,160 @@ static unsigned int sessionChangeHash(
return (h % nBucket);
}
static int sessionSerialLen(u8 *a){
int e = *a;
int n;
if( e==0 ) return 1;
if( e==SQLITE_NULL ) return 1;
if( e==SQLITE_INTEGER || e==SQLITE_FLOAT ) return 9;
return sessionVarintGet(&a[1], &n) + 1 + n;
}
static int sessionChangeEqual(
SessionTable *pTab,
u8 *aLeft, /* Change record */
u8 *aRight /* Change record */
){
u8 *a1 = aLeft;
u8 *a2 = aRight;
int i;
for(i=0; i<pTab->nCol; i++){
int n1 = sessionSerialLen(a1);
int n2 = sessionSerialLen(a2);
if( pTab->abPK[i] && (n1!=n2 || memcmp(a1, a2, n1)) ){
return 0;
}
a1 += n1;
a2 += n1;
}
return 1;
}
static void sessionMergeRecord(
u8 **paOut,
SessionTable *pTab,
u8 *aLeft,
u8 *aRight
){
u8 *a1 = aLeft;
u8 *a2 = aRight;
u8 *aOut = *paOut;
int i;
for(i=0; i<pTab->nCol; i++){
int n1 = sessionSerialLen(a1);
int n2 = sessionSerialLen(a2);
if( *a2 ){
memcpy(aOut, a2, n2);
aOut += n2;
}else{
memcpy(aOut, a1, n1);
aOut += n1;
}
a1 += n1;
a2 += n2;
}
*paOut = aOut;
}
static u8 *sessionMergeValue(
u8 **paOne,
u8 **paTwo,
int *pnVal
){
u8 *a1 = *paOne;
u8 *a2 = *paTwo;
u8 *pRet = 0;
int n1;
assert( a1 );
if( a2 ){
int n2 = sessionSerialLen(a2);
if( *a2 ){
*pnVal = n2;
pRet = a2;
}
*paTwo = &a2[n2];
}
n1 = sessionSerialLen(a1);
if( pRet==0 ){
*pnVal = n1;
pRet = a1;
}
*paOne = &a1[n1];
return pRet;
}
static int sessionMergeUpdate(
u8 **paOut,
SessionTable *pTab,
u8 *aOldRecord1,
u8 *aOldRecord2,
u8 *aNewRecord1,
u8 *aNewRecord2
){
u8 *aOld1 = aOldRecord1;
u8 *aOld2 = aOldRecord2;
u8 *aNew1 = aNewRecord1;
u8 *aNew2 = aNewRecord2;
u8 *aOut = *paOut;
int i;
int bRequired = 0;
assert( aOldRecord1 && aNewRecord1 );
/* Write the old.* vector first. */
for(i=0; i<pTab->nCol; i++){
int nOld;
u8 *aOld;
int nNew;
u8 *aNew;
aOld = sessionMergeValue(&aOld1, &aOld2, &nOld);
aNew = sessionMergeValue(&aNew1, &aNew2, &nNew);
if( pTab->abPK[i] || nOld!=nNew || memcmp(aOld, aNew, nNew) ){
if( pTab->abPK[i]==0 ) bRequired = 1;
memcpy(aOut, aOld, nOld);
aOut += nOld;
}else{
*(aOut++) = '\0';
}
}
if( !bRequired ) return 0;
/* Write the new.* vector */
aOld1 = aOldRecord1;
aOld2 = aOldRecord2;
aNew1 = aNewRecord1;
aNew2 = aNewRecord2;
for(i=0; i<pTab->nCol; i++){
int nOld;
u8 *aOld;
int nNew;
u8 *aNew;
aOld = sessionMergeValue(&aOld1, &aOld2, &nOld);
aNew = sessionMergeValue(&aNew1, &aNew2, &nNew);
if( pTab->abPK[i] || (nOld==nNew && 0==memcmp(aOld, aNew, nNew)) ){
*(aOut++) = '\0';
}else{
memcpy(aOut, aNew, nNew);
aOut += nNew;
}
}
*paOut = aOut;
return 1;
}
static int sessionPreupdateEqual(
sqlite3 *db,
SessionTable *pTab,
@ -480,7 +634,7 @@ static int sessionPreupdateEqual(
** Growing the hash table in this case is a performance optimization only,
** it is not required for correct operation.
*/
static int sessionGrowHash(sqlite3_session *pSession, SessionTable *pTab){
static int sessionGrowHash(SessionTable *pTab){
if( pTab->nChange==0 || pTab->nEntry>=(pTab->nChange/2) ){
int i;
SessionChange **apNew;
@ -489,7 +643,6 @@ static int sessionGrowHash(sqlite3_session *pSession, SessionTable *pTab){
apNew = (SessionChange **)sqlite3_malloc(sizeof(SessionChange *) * nNew);
if( apNew==0 ){
if( pTab->nChange==0 ){
pSession->rc = SQLITE_NOMEM;
return SQLITE_ERROR;
}
return SQLITE_OK;
@ -500,7 +653,7 @@ static int sessionGrowHash(sqlite3_session *pSession, SessionTable *pTab){
SessionChange *p;
SessionChange *pNext;
for(p=pTab->apChange[i]; p; p=pNext){
int iHash = sessionChangeHash(pSession->db, pTab, p, nNew);
int iHash = sessionChangeHash(pTab, p->aRecord, nNew);
pNext = p->pNext;
p->pNext = apNew[iHash];
apNew[iHash] = p;
@ -677,7 +830,10 @@ static void sessionPreupdateOneChange(
if( sessionInitTable(pSession, pTab) ) return;
/* Grow the hash table if required */
if( sessionGrowHash(pSession, pTab) ) return;
if( sessionGrowHash(pTab) ){
pSession->rc = SQLITE_NOMEM;
return;
}
/* Search the hash table for an existing entry for rowid=iKey2. If
** one is found, store a pointer to it in pChange and unlink it from
@ -693,8 +849,8 @@ static void sessionPreupdateOneChange(
}
if( pC==0 ){
/* Create a new change object containing all the old values (if
** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK
** values (if this is an INSERT). */
** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK
** values (if this is an INSERT). */
SessionChange *pChange; /* New change object */
int nByte; /* Number of bytes to allocate */
int i; /* Used to iterate through columns */
@ -851,6 +1007,27 @@ int sqlite3session_create(
return SQLITE_OK;
}
void sessionDeleteTable(SessionTable *pList){
SessionTable *pNext;
SessionTable *pTab;
for(pTab=pList; pTab; pTab=pNext){
int i;
pNext = pTab->pNext;
for(i=0; i<pTab->nChange; i++){
SessionChange *p;
SessionChange *pNext;
for(p=pTab->apChange[i]; p; p=pNext){
pNext = p->pNext;
sqlite3_free(p);
}
}
sqlite3_free((char*)pTab->azCol); /* cast works around VC++ bug */
sqlite3_free(pTab->apChange);
sqlite3_free(pTab);
}
}
/*
** Delete a session object previously allocated using sqlite3session_create().
*/
@ -870,22 +1047,7 @@ void sqlite3session_delete(sqlite3_session *pSession){
/* Delete all attached table objects. And the contents of their
** associated hash-tables. */
while( pSession->pTable ){
int i;
SessionTable *pTab = pSession->pTable;
pSession->pTable = pTab->pNext;
for(i=0; i<pTab->nChange; i++){
SessionChange *p;
SessionChange *pNext;
for(p=pTab->apChange[i]; p; p=pNext){
pNext = p->pNext;
sqlite3_free(p);
}
}
sqlite3_free((char*)pTab->azCol); /* cast works around VC++ bug */
sqlite3_free(pTab->apChange);
sqlite3_free(pTab);
}
sessionDeleteTable(pSession->pTable);
/* Free the session object itself. */
sqlite3_free(pSession);
@ -1322,6 +1484,18 @@ static int sessionSelectBind(
return rc;
}
static void sessionAppendTableHdr(
SessionBuffer *pBuf,
SessionTable *pTab,
int *pRc
){
/* Write a table header */
sessionAppendByte(pBuf, 'T', pRc);
sessionAppendVarint(pBuf, pTab->nCol, pRc);
sessionAppendBlob(pBuf, pTab->abPK, pTab->nCol, pRc);
sessionAppendBlob(pBuf, (u8 *)pTab->zName, strlen(pTab->zName)+1, pRc);
}
/*
** Obtain a changeset object containing all changes recorded by the
** session object passed as the first argument.
@ -1369,10 +1543,7 @@ int sqlite3session_changeset(
}
/* Write a table header */
sessionAppendByte(&buf, 'T', &rc);
sessionAppendVarint(&buf, nCol, &rc);
sessionAppendBlob(&buf, pTab->abPK, nCol, &rc);
sessionAppendBlob(&buf, (u8 *)zName, sqlite3Strlen30(zName)+1, &rc);
sessionAppendTableHdr(&buf, pTab, &rc);
/* Build and compile a statement to execute: */
if( rc==SQLITE_OK ){
@ -1525,9 +1696,9 @@ static int sessionReadRecord(
if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
int nByte;
int enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
aRec += sessionVarintGet(aRec, &nByte);
if( apOut ){
int enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
sqlite3ValueSetStr(apOut[i], nByte, aRec, enc, SQLITE_STATIC);
}
aRec += nByte;
@ -1552,23 +1723,21 @@ static int sessionReadRecord(
return SQLITE_OK;
}
/*
** Advance an iterator created by sqlite3changeset_start() to the next
** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
** or SQLITE_CORRUPT.
**
** This function may not be called on iterators passed to a conflict handler
** callback by changeset_apply().
*/
int sqlite3changeset_next(sqlite3_changeset_iter *p){
static int sessionChangesetNext(
sqlite3_changeset_iter *p,
u8 **paRec,
int *pnRec
){
u8 *aChange;
int i;
u8 c;
assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );
/* If the iterator is in the error-state, return immediately. */
if( p->rc!=SQLITE_OK ) return p->rc;
/* Free the current contents of p->apValue[]. */
/* Free the current contents of p->apValue[], if any. */
if( p->apValue ){
for(i=0; i<p->nCol*2; i++){
sqlite3ValueFree(p->apValue[i]);
@ -1582,47 +1751,64 @@ int sqlite3changeset_next(sqlite3_changeset_iter *p){
}
aChange = p->pNext;
c = *(aChange++);
if( c=='T' ){
if( aChange[0]=='T' ){
int nByte; /* Bytes to allocate for apValue */
aChange++;
aChange += sessionVarintGet(aChange, &p->nCol);
p->abPK = (u8 *)aChange;
aChange += p->nCol;
p->zTab = (char *)aChange;
aChange += (sqlite3Strlen30((char *)aChange) + 1);
p->op = *(aChange++);
p->bIndirect = *(aChange++);
sqlite3_free(p->apValue);
nByte = sizeof(sqlite3_value *) * p->nCol * 2;
p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
if( !p->apValue ){
return (p->rc = SQLITE_NOMEM);
if( paRec==0 ){
sqlite3_free(p->apValue);
nByte = sizeof(sqlite3_value *) * p->nCol * 2;
p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
if( !p->apValue ){
return (p->rc = SQLITE_NOMEM);
}
memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
}
memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
}else{
p->op = c;
p->bIndirect = *(aChange++);
}
p->op = *(aChange++);
p->bIndirect = *(aChange++);
if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
return (p->rc = SQLITE_CORRUPT);
}
if( paRec ){ *paRec = aChange; }
/* If this is an UPDATE or DELETE, read the old.* record. */
if( p->op!=SQLITE_INSERT ){
p->rc = sessionReadRecord(&aChange, p->nCol, p->apValue);
p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:p->apValue);
if( p->rc!=SQLITE_OK ) return p->rc;
}
/* If this is an INSERT or UPDATE, read the new.* record. */
if( p->op!=SQLITE_DELETE ){
p->rc = sessionReadRecord(&aChange, p->nCol, &p->apValue[p->nCol]);
p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:&p->apValue[p->nCol]);
if( p->rc!=SQLITE_OK ) return p->rc;
}
if( pnRec ){ *pnRec = aChange - *paRec; }
p->pNext = aChange;
return SQLITE_ROW;
}
/*
** Advance an iterator created by sqlite3changeset_start() to the next
** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
** or SQLITE_CORRUPT.
**
** This function may not be called on iterators passed to a conflict handler
** callback by changeset_apply().
*/
int sqlite3changeset_next(sqlite3_changeset_iter *p){
return sessionChangesetNext(p, 0, 0);
}
/*
** The following function extracts information on the current change
** from a changeset iterator. They may only be called after changeset_next()
@ -2535,4 +2721,245 @@ int sqlite3changeset_apply(
return rc;
}
static int sessionChangeMerge(
SessionTable *pTab,
SessionChange *pExist,
int op2,
int bIndirect,
u8 *aRec,
int nRec,
SessionChange **ppNew
){
SessionChange *pNew = 0;
if( !pExist ){
pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange));
if( !pNew ){
return SQLITE_NOMEM;
}
memset(pNew, 0, sizeof(SessionChange));
pNew->bInsert = op2;
pNew->bIndirect = bIndirect;
pNew->nRecord = nRec;
pNew->aRecord = aRec;
}else{
int op1 = pExist->bInsert;
/*
** op1=INSERT, op2=INSERT -> Unsupported. Discard op2.
** op1=INSERT, op2=UPDATE -> INSERT.
** op1=INSERT, op2=DELETE -> (none)
**
** op1=UPDATE, op2=INSERT -> Unsupported. Discard op2.
** op1=UPDATE, op2=UPDATE -> UPDATE.
** op1=UPDATE, op2=DELETE -> DELETE.
**
** op1=DELETE, op2=INSERT -> UPDATE.
** op1=DELETE, op2=UPDATE -> Unsupported. Discard op2.
** op1=DELETE, op2=DELETE -> Unsupported. Discard op2.
*/
if( (op1==SQLITE_INSERT && op2==SQLITE_INSERT)
|| (op1==SQLITE_UPDATE && op2==SQLITE_INSERT)
|| (op1==SQLITE_DELETE && op2==SQLITE_UPDATE)
|| (op1==SQLITE_DELETE && op2==SQLITE_DELETE)
){
pNew = pExist;
}else if( op1==SQLITE_INSERT && op2==SQLITE_DELETE ){
sqlite3_free(pExist);
assert( pNew==0 );
}else{
int nByte;
u8 *aCsr;
nByte = sizeof(SessionChange) + pExist->nRecord + nRec;
pNew = (SessionChange *)sqlite3_malloc(nByte);
if( !pNew ){
return SQLITE_NOMEM;
}
memset(pNew, 0, sizeof(SessionChange));
pNew->bIndirect = (bIndirect && pExist->bIndirect);
aCsr = pNew->aRecord = (u8 *)&pNew[1];
if( op1==SQLITE_INSERT && op2==SQLITE_UPDATE ){
u8 *a1 = aRec;
pNew->bInsert = SQLITE_INSERT;
sessionReadRecord(&a1, pTab->nCol, 0);
sessionMergeRecord(&aCsr, pTab, pExist->aRecord, a1);
}
else if( op1==SQLITE_UPDATE && op2==SQLITE_UPDATE ){
u8 *a1 = pExist->aRecord;
u8 *a2 = aRec;
sessionReadRecord(&a1, pTab->nCol, 0);
sessionReadRecord(&a2, pTab->nCol, 0);
pNew->bInsert = SQLITE_UPDATE;
if( 0==sessionMergeUpdate(&aCsr, pTab, aRec, pExist->aRecord, a1, a2) ){
sqlite3_free(pNew);
pNew = 0;
}
}
else if( op1==SQLITE_UPDATE && op2==SQLITE_DELETE ){
pNew->bInsert = SQLITE_DELETE;
sessionMergeRecord(&aCsr, pTab, aRec, pExist->aRecord);
}
else if( op1==SQLITE_DELETE && op2==SQLITE_INSERT ){
pNew->bInsert = SQLITE_UPDATE;
if( 0==sessionMergeUpdate(&aCsr, pTab, pExist->aRecord, 0, aRec, 0) ){
sqlite3_free(pNew);
pNew = 0;
}
}
if( pNew ){
pNew->nRecord = (aCsr - pNew->aRecord);
}
sqlite3_free(pExist);
}
}
*ppNew = pNew;
return SQLITE_OK;
}
int sessionConcatChangeset(
int nChangeset,
void *pChangeset,
SessionTable **ppTabList
){
u8 *aRec;
int nRec;
sqlite3_changeset_iter *pIter;
int rc;
SessionTable *pTab = 0;
rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
if( rc!=SQLITE_OK ) return rc;
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
const char *zNew;
int nCol;
int op;
int iHash;
int bIndirect;
SessionChange *pChange;
SessionChange *pExist = 0;
SessionChange **pp;
assert( pIter->apValue==0 );
sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
if( !pTab || zNew!=pTab->zName ){
/* Search the list for a matching table */
int nNew = strlen(zNew);
for(pTab = *ppTabList; pTab; pTab=pTab->pNext){
if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break;
}
if( !pTab ){
pTab = sqlite3_malloc(sizeof(SessionTable));
if( !pTab ) break;
memset(pTab, 0, sizeof(SessionTable));
pTab->pNext = *ppTabList;
*ppTabList = pTab;
}
pTab->zName = (char *)zNew;
pTab->nCol = nCol;
sqlite3changeset_pk(pIter, &pTab->abPK, 0);
}
if( sessionGrowHash(pTab) ) break;
iHash = sessionChangeHash(pTab, aRec, pTab->nChange);
/* Search for existing entry. If found, remove it from the hash table.
** Code below may link it back in.
*/
for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
if( sessionChangeEqual(pTab, (*pp)->aRecord, aRec) ){
pExist = *pp;
*pp = (*pp)->pNext;
pTab->nEntry--;
break;
}
}
rc = sessionChangeMerge(pTab, pExist, op, bIndirect, aRec, nRec, &pChange);
if( rc ) break;
if( pChange ){
pChange->pNext = pTab->apChange[iHash];
pTab->apChange[iHash] = pChange;
pTab->nEntry++;
}
}
if( rc==SQLITE_OK ){
rc = sqlite3changeset_finalize(pIter);
}else{
sqlite3changeset_finalize(pIter);
}
return rc;
}
/*
** 1. Iterate through the left-hand changeset. Add an entry to a table
** specific hash table for each change in the changeset. The hash table
** key is the PK of the row affected by the change.
**
** 2. Then interate through the right-hand changeset. Attempt to add an
** entry to a hash table for each component change. If a change already
** exists with the same PK values, combine the two into a single change.
**
** 3. Write an output changeset based on the contents of the hash table.
*/
int sqlite3changeset_concat(
int nLeft, /* Number of bytes in lhs input */
void *pLeft, /* Lhs input changeset */
int nRight /* Number of bytes in rhs input */,
void *pRight, /* Rhs input changeset */
int *pnOut, /* OUT: Number of bytes in output changeset */
void **ppOut /* OUT: changeset (left <concat> right) */
){
SessionTable *pList = 0; /* List of SessionTable objects */
int rc; /* Return code */
*pnOut = 0;
*ppOut = 0;
rc = sessionConcatChangeset(nLeft, pLeft, &pList);
if( rc==SQLITE_OK ){
rc = sessionConcatChangeset(nRight, pRight, &pList);
}
/* Create the serialized output changeset based on the contents of the
** hash tables attached to the SessionTable objects in list pList.
*/
if( rc==SQLITE_OK ){
SessionTable *pTab;
SessionBuffer buf = {0, 0, 0};
for(pTab=pList; pTab; pTab=pTab->pNext){
int i;
if( pTab->nEntry==0 ) continue;
sessionAppendTableHdr(&buf, pTab, &rc);
for(i=0; i<pTab->nChange; i++){
SessionChange *p;
for(p=pTab->apChange[i]; p; p=p->pNext){
sessionAppendByte(&buf, p->bInsert, &rc);
sessionAppendByte(&buf, p->bIndirect, &rc);
sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
}
}
}
if( rc==SQLITE_OK ){
*ppOut = buf.aBuf;
*pnOut = buf.nBuf;
}else{
sqlite3_free(buf.aBuf);
}
}
concat_out:
sessionDeleteTable(pList);
return rc;
}
#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */