1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-27 20:41:58 +03:00

Add the "indirect flag" to the changeset blob format. Also the sqlite3session_indirect() API.

FossilOrigin-Name: 1feaf2d35fd9ec777319717ae2c2929d66fe7baa
This commit is contained in:
dan
2011-03-23 16:03:11 +00:00
parent 1e7a2d4315
commit b4480e942f
8 changed files with 260 additions and 85 deletions

View File

@ -19,6 +19,7 @@ struct sqlite3_session {
sqlite3 *db; /* Database handle session is attached to */
char *zDb; /* Name of database session is attached to */
int bEnable; /* True if currently recording */
int bIndirect; /* True if all changes are indirect */
int bAutoAttach; /* True to auto-attach tables */
int rc; /* Non-zero if an error has occurred */
sqlite3_session *pNext; /* Next session object on same db. */
@ -37,6 +38,7 @@ struct sqlite3_changeset_iter {
char *zTab; /* Current table */
int nCol; /* Number of columns in zTab */
int op; /* Current operation */
int bIndirect; /* True if current change was indirect */
sqlite3_value **apValue; /* old.* and new.* values */
};
@ -131,6 +133,7 @@ struct SessionTable {
*/
struct SessionChange {
int bInsert; /* True if row was inserted this session */
int bIndirect; /* True if this change is "indirect" */
int nRecord; /* Number of bytes in buffer aRecord[] */
u8 *aRecord; /* Buffer containing old.* record */
SessionChange *pNext; /* For hash-table collisions */
@ -660,8 +663,6 @@ static void sessionPreupdateOneChange(
SessionTable *pTab
){
sqlite3 *db = pSession->db;
SessionChange *pChange;
SessionChange *pC;
int iHash;
int bNullPk = 0;
int rc = SQLITE_OK;
@ -679,7 +680,8 @@ static void sessionPreupdateOneChange(
** the hash table. Otherwise, set pChange to NULL.
*/
rc = sessionPreupdateHash(db, pTab, op==SQLITE_INSERT, &iHash, &bNullPk);
if( bNullPk==0 ){
if( rc==SQLITE_OK && bNullPk==0 ){
SessionChange *pC;
for(pC=pTab->apChange[iHash]; rc==SQLITE_OK && pC; pC=pC->pNext){
int bEqual;
rc = sessionPreupdateEqual(db, pTab, pC, op==SQLITE_INSERT, &bEqual);
@ -689,9 +691,11 @@ static void sessionPreupdateOneChange(
/* Create a new change object containing all the old values (if
** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK
** values (if this is an INSERT). */
SessionChange *pChange; /* New change object */
int nByte; /* Number of bytes to allocate */
int i; /* Used to iterate through columns */
assert( rc==SQLITE_OK );
pTab->nEntry++;
/* Figure out how large an allocation is required */
@ -732,6 +736,9 @@ static void sessionPreupdateOneChange(
}
if( rc==SQLITE_OK ){
/* Add the change back to the hash-table */
if( pSession->bIndirect || sqlite3_preupdate_depth(pSession->db) ){
pChange->bIndirect = 1;
}
pChange->nRecord = nByte;
pChange->bInsert = (op==SQLITE_INSERT);
pChange->pNext = pTab->apChange[iHash];
@ -739,6 +746,12 @@ static void sessionPreupdateOneChange(
}else{
sqlite3_free(pChange);
}
}else if( rc==SQLITE_OK && pC->bIndirect ){
/* If the existing change is considered "indirect", but this current
** change is "direct", mark the change object as direct. */
if( sqlite3_preupdate_depth(pSession->db)==0 && pSession->bIndirect==0 ){
pC->bIndirect = 0;
}
}
}
@ -1136,6 +1149,7 @@ static void sessionAppendUpdate(
u8 *pCsr = p->aRecord; /* Used to iterate through old.* values */
sessionAppendByte(pBuf, SQLITE_UPDATE, pRc);
sessionAppendByte(pBuf, p->bIndirect, pRc);
for(i=0; i<sqlite3_column_count(pStmt); i++){
int bChanged = 0;
int nAdvance;
@ -1366,6 +1380,7 @@ int sqlite3session_changeset(
int iCol;
if( p->bInsert ){
sessionAppendByte(&buf, SQLITE_INSERT, &rc);
sessionAppendByte(&buf, p->bIndirect, &rc);
for(iCol=0; iCol<nCol; iCol++){
sessionAppendCol(&buf, pSel, iCol, &rc);
}
@ -1375,6 +1390,7 @@ int sqlite3session_changeset(
}else if( !p->bInsert ){
/* A DELETE change */
sessionAppendByte(&buf, SQLITE_DELETE, &rc);
sessionAppendByte(&buf, p->bIndirect, &rc);
sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
}
if( rc==SQLITE_OK ){
@ -1416,6 +1432,20 @@ int sqlite3session_enable(sqlite3_session *pSession, int bEnable){
return ret;
}
/*
** Enable or disable the session object passed as the first argument.
*/
int sqlite3session_indirect(sqlite3_session *pSession, int bIndirect){
int ret;
sqlite3_mutex_enter(sqlite3_db_mutex(pSession->db));
if( bIndirect>=0 ){
pSession->bIndirect = bIndirect;
}
ret = pSession->bIndirect;
sqlite3_mutex_leave(sqlite3_db_mutex(pSession->db));
return ret;
}
/*
** Create an iterator used to iterate through the contents of a changeset.
*/
@ -1548,6 +1578,7 @@ int sqlite3changeset_next(sqlite3_changeset_iter *p){
p->zTab = (char *)aChange;
aChange += (strlen((char *)aChange) + 1);
p->op = *(aChange++);
p->bIndirect = *(aChange++);
sqlite3_free(p->apValue);
nByte = sizeof(sqlite3_value *) * p->nCol * 2;
p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
@ -1557,6 +1588,7 @@ int sqlite3changeset_next(sqlite3_changeset_iter *p){
memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
}else{
p->op = c;
p->bIndirect = *(aChange++);
}
if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
return (p->rc = SQLITE_CORRUPT);
@ -1587,11 +1619,13 @@ int sqlite3changeset_op(
sqlite3_changeset_iter *pIter, /* Iterator handle */
const char **pzTab, /* OUT: Pointer to table name */
int *pnCol, /* OUT: Number of columns in table */
int *pOp /* OUT: SQLITE_INSERT, DELETE or UPDATE */
int *pOp, /* OUT: SQLITE_INSERT, DELETE or UPDATE */
int *pbIndirect /* OUT: True if change is indirect */
){
*pOp = pIter->op;
*pnCol = pIter->nCol;
*pzTab = pIter->zTab;
if( pbIndirect ) *pbIndirect = pIter->bIndirect;
return SQLITE_OK;
}
@ -1740,31 +1774,33 @@ int sqlite3changeset_invert(
case SQLITE_INSERT:
case SQLITE_DELETE: {
int nByte;
u8 *aEnd = &aIn[i+1];
u8 *aEnd = &aIn[i+2];
sessionReadRecord(&aEnd, nCol, 0);
aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
nByte = aEnd - &aIn[i+1];
memcpy(&aOut[i+1], &aIn[i+1], nByte);
i += 1 + nByte;
aOut[i+1] = aIn[i+1];
nByte = aEnd - &aIn[i+2];
memcpy(&aOut[i+2], &aIn[i+2], nByte);
i += 2 + nByte;
break;
}
case SQLITE_UPDATE: {
int nByte1; /* Size of old.* record in bytes */
int nByte2; /* Size of new.* record in bytes */
u8 *aEnd = &aIn[i+1];
u8 *aEnd = &aIn[i+2];
sessionReadRecord(&aEnd, nCol, 0);
nByte1 = aEnd - &aIn[i+1];
nByte1 = aEnd - &aIn[i+2];
sessionReadRecord(&aEnd, nCol, 0);
nByte2 = aEnd - &aIn[i+1] - nByte1;
nByte2 = aEnd - &aIn[i+2] - nByte1;
aOut[i] = SQLITE_UPDATE;
memcpy(&aOut[i+1], &aIn[i+1+nByte1], nByte2);
memcpy(&aOut[i+1+nByte2], &aIn[i+1], nByte1);
aOut[i+1] = aIn[i+1];
memcpy(&aOut[i+2], &aIn[i+2+nByte1], nByte2);
memcpy(&aOut[i+2+nByte2], &aIn[i+2], nByte1);
i += 1 + nByte1 + nByte2;
i += 2 + nByte1 + nByte2;
break;
}
@ -2097,7 +2133,7 @@ static int sessionSeekToRow(
int op; /* Changset operation (SQLITE_UPDATE etc.) */
const char *zDummy; /* Unused */
sqlite3changeset_op(pIter, &zDummy, &nCol, &op);
sqlite3changeset_op(pIter, &zDummy, &nCol, &op, 0);
rc = sessionBindRow(pIter,
op==SQLITE_INSERT ? sqlite3changeset_new : sqlite3changeset_old,
nCol, abPK, pSelect
@ -2160,7 +2196,7 @@ static int sessionConflictHandler(
int op;
const char *zDummy;
sqlite3changeset_op(pIter, &zDummy, &nCol, &op);
sqlite3changeset_op(pIter, &zDummy, &nCol, &op, 0);
assert( eType==SQLITE_CHANGESET_CONFLICT || eType==SQLITE_CHANGESET_DATA );
assert( SQLITE_CHANGESET_CONFLICT+1==SQLITE_CHANGESET_CONSTRAINT );
@ -2248,7 +2284,7 @@ static int sessionApplyOneOp(
assert( p->azCol && p->abPK );
assert( !pbReplace || *pbReplace==0 );
sqlite3changeset_op(pIter, &zDummy, &nCol, &op);
sqlite3changeset_op(pIter, &zDummy, &nCol, &op, 0);
if( op==SQLITE_DELETE ){
@ -2362,7 +2398,7 @@ int sqlite3changeset_apply(
int bReplace = 0;
int bRetry = 0;
const char *zNew;
sqlite3changeset_op(pIter, &zNew, &nCol, &op);
sqlite3changeset_op(pIter, &zNew, &nCol, &op, 0);
if( zTab==0 || sqlite3_strnicmp(zNew, zTab, nTab+1) ){
sqlite3_free(sApply.azCol);