From d5f0767c9ceb8e031813dd71a25e41a56de27487 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 11 Mar 2011 19:05:52 +0000 Subject: [PATCH] Add the sqlite3changeset_apply() function. Does not yet handle all cases. FossilOrigin-Name: 2b19be7bf753c7dd12e1c3b384981a3ea1bc8145 --- ext/session/sqlite3session.c | 606 ++++++++++++++++++++++++++++++++++- ext/session/sqlite3session.h | 45 +++ ext/session/test_session.c | 137 ++++++++ manifest | 18 +- manifest.uuid | 2 +- test/session1.test | 132 ++++++++ 6 files changed, 924 insertions(+), 16 deletions(-) diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c index 0b6a985279..86778d65f0 100644 --- a/ext/session/sqlite3session.c +++ b/ext/session/sqlite3session.c @@ -573,6 +573,47 @@ static void sessionAppendBlob( } } +static void sessionAppendStr( + SessionBuffer *p, + const char *zStr, + int *pRc +){ + int nStr = strlen(zStr); + if( *pRc==SQLITE_OK && 0==sessionBufferGrow(p, nStr, pRc) ){ + memcpy(&p->aBuf[p->nBuf], zStr, nStr); + p->nBuf += nStr; + } +} + +static void sessionAppendInteger( + SessionBuffer *p, + int iVal, + int *pRc +){ + char aBuf[24]; + sqlite3_snprintf(sizeof(aBuf)-1, aBuf, "%d", iVal); + sessionAppendStr(p, aBuf, pRc); +} + +static void sessionAppendIdent( + SessionBuffer *p, + const char *zStr, + int *pRc +){ + int nStr = strlen(zStr)*2 + 2 + 1; + if( *pRc==SQLITE_OK && 0==sessionBufferGrow(p, nStr, pRc) ){ + char *zOut = (char *)&p->aBuf[p->nBuf]; + const char *zIn = zStr; + *zOut++ = '"'; + while( *zIn ){ + if( *zIn=='"' ) *zOut++ = '"'; + *zOut++ = *(zIn++); + } + *zOut++ = '"'; + p->nBuf = ((u8 *)zOut - p->aBuf); + } +} + static void sessionAppendCol( SessionBuffer *p, sqlite3_stmt *pStmt, @@ -616,6 +657,7 @@ static void sessionAppendUpdate( sqlite3_stmt *pStmt, SessionBuffer *pBuf, SessionChange *p, + u8 *abPK, int *pRc ){ if( *pRc==SQLITE_OK ){ @@ -667,6 +709,9 @@ static void sessionAppendUpdate( nCopy = nAdvance; } } + if( abPK[i] ){ + nCopy = nAdvance; + } if( nCopy==0 ){ sessionAppendByte(pBuf, 0, pRc); @@ -686,8 +731,98 @@ static void sessionAppendUpdate( sqlite3_free(buf2.aBuf); } } +} +static int sessionTableInfo( + sqlite3 *db, /* Database connection */ + const char *zThis, /* Table name */ + int nCol, /* Expected number of columns */ + const char **pzTab, /* OUT: Copy of zThis */ + const char ***pazCol, /* OUT: Array of column names for table */ + u8 **pabPK /* OUT: Array of booleans - true for PK col */ +){ + char *zPragma; + sqlite3_stmt *pStmt; + int rc; + int nByte; + int nDbCol = 0; + int nThis; + int i; + u8 *pAlloc; + u8 *pFree = 0; + char **azCol; + u8 *abPK; + + nThis = strlen(zThis); + zPragma = sqlite3_mprintf("PRAGMA main.table_info('%q')", zThis); + if( !zPragma ) return SQLITE_NOMEM; + + rc = sqlite3_prepare_v2(db, zPragma, -1, &pStmt, 0); + sqlite3_free(zPragma); + if( rc!=SQLITE_OK ) return rc; + + nByte = nThis + 1; + while( SQLITE_ROW==sqlite3_step(pStmt) ){ + nByte += sqlite3_column_bytes(pStmt, 1); + nDbCol++; + } + rc = sqlite3_reset(pStmt); + + if( nDbCol!=nCol ){ + rc = SQLITE_SCHEMA; + } + if( rc==SQLITE_OK ){ + nByte += nDbCol * (sizeof(const char *) + sizeof(u8) + 1); + pAlloc = sqlite3_malloc(nByte); + if( pAlloc==0 ){ + rc = SQLITE_NOMEM; + } + } + if( rc==SQLITE_OK ){ + pFree = pAlloc; + if( pazCol ){ + azCol = (char **)pAlloc; + pAlloc = (u8 *)&azCol[nCol]; + } + if( pabPK ){ + abPK = (u8 *)pAlloc; + pAlloc = &abPK[nCol]; + } + if( pzTab ){ + memcpy(pAlloc, zThis, nThis+1); + *pzTab = (char *)pAlloc; + pAlloc += nThis+1; + } + + i = 0; + while( SQLITE_ROW==sqlite3_step(pStmt) ){ + int nName = sqlite3_column_bytes(pStmt, 1); + const unsigned char *zName = sqlite3_column_text(pStmt, 1); + if( zName==0 ) break; + if( pazCol ){ + memcpy(pAlloc, zName, nName+1); + azCol[i] = (char *)pAlloc; + pAlloc += nName+1; + } + if( pabPK ) abPK[i] = sqlite3_column_int(pStmt, 5); + i++; + } + rc = sqlite3_reset(pStmt); + + } + if( rc==SQLITE_OK ){ + if( pazCol ) *pazCol = (const char **)azCol; + if( pabPK ) *pabPK = abPK; + }else{ + if( pazCol ) *pazCol = 0; + if( pabPK ) *pabPK = 0; + if( pzTab ) *pzTab = 0; + sqlite3_free(pFree); + } + + sqlite3_finalize(pStmt); + return rc; } /* @@ -717,6 +852,7 @@ int sqlite3session_changeset( sqlite3_stmt *pStmt = 0; int bNoop = 1; int nRewind = buf.nBuf; + u8 *abPK = 0; /* Write a table header */ sessionAppendByte(&buf, 'T', &rc); @@ -740,6 +876,10 @@ int sqlite3session_changeset( rc = SQLITE_SCHEMA; } + if( rc==SQLITE_OK ){ + rc = sessionTableInfo(db, pTab->zName, pTab->nCol, 0, 0, &abPK); + } + for(i=0; inChange; i++){ SessionChange *p; for(p=pTab->apChange[i]; rc==SQLITE_OK && p; p=p->pNext){ @@ -747,7 +887,7 @@ int sqlite3session_changeset( if( sqlite3_step(pStmt)==SQLITE_ROW ){ int iCol; if( p->aRecord ){ - sessionAppendUpdate(pStmt, &buf, p, &rc); + sessionAppendUpdate(pStmt, &buf, p, abPK, &rc); }else{ sessionAppendByte(&buf, SQLITE_INSERT, &rc); for(iCol=0; iColnCol; iCol++){ @@ -766,6 +906,7 @@ int sqlite3session_changeset( } sqlite3_finalize(pStmt); + sqlite3_free(abPK); if( bNoop ){ buf.nBuf = nRewind; @@ -797,6 +938,7 @@ struct sqlite3_changeset_iter { u8 *pNext; /* Pointer to next change within aChangeset */ int rc; + sqlite3_stmt *pConflict; /* Conflicting row, if any */ char *zTab; /* Current table */ int nCol; /* Number of columns in zTab */ int op; /* Current operation */ @@ -947,9 +1089,9 @@ int sqlite3changeset_next(sqlite3_changeset_iter *p){ */ int sqlite3changeset_op( sqlite3_changeset_iter *pIter, - const char **pzTab, /* OUT: Pointer to table name */ - int *pnCol, /* OUT: Number of columns in table */ - int *pOp /* OUT: SQLITE_INSERT, DELETE or UPDATE */ + const char **pzTab, /* OUT: Pointer to table name */ + int *pnCol, /* OUT: Number of columns in table */ + int *pOp /* OUT: SQLITE_INSERT, DELETE or UPDATE */ ){ *pOp = pIter->op; *pnCol = pIter->nCol; @@ -960,8 +1102,11 @@ int sqlite3changeset_op( int sqlite3changeset_old( sqlite3_changeset_iter *pIter, int iVal, - sqlite3_value **ppValue /* OUT: Old value (or NULL pointer) */ + sqlite3_value **ppValue /* OUT: Old value (or NULL pointer) */ ){ + if( pIter->op!=SQLITE_UPDATE && pIter->op!=SQLITE_DELETE ){ + return SQLITE_MISUSE; + } if( iVal<0 || iVal>=pIter->nCol ){ return SQLITE_RANGE; } @@ -972,8 +1117,11 @@ int sqlite3changeset_old( int sqlite3changeset_new( sqlite3_changeset_iter *pIter, int iVal, - sqlite3_value **ppValue /* OUT: New value (or NULL pointer) */ + sqlite3_value **ppValue /* OUT: New value (or NULL pointer) */ ){ + if( pIter->op!=SQLITE_UPDATE && pIter->op!=SQLITE_INSERT ){ + return SQLITE_MISUSE; + } if( iVal<0 || iVal>=pIter->nCol ){ return SQLITE_RANGE; } @@ -981,6 +1129,21 @@ int sqlite3changeset_new( return SQLITE_OK; } +int sqlite3changeset_conflict( + sqlite3_changeset_iter *pIter, + int iVal, + sqlite3_value **ppValue /* OUT: Value from conflicting row */ +){ + if( !pIter->pConflict ){ + return SQLITE_MISUSE; + } + if( iVal<0 || iVal>=sqlite3_column_count(pIter->pConflict) ){ + return SQLITE_RANGE; + } + *ppValue = sqlite3_column_value(pIter->pConflict, iVal); + return SQLITE_OK; +} + /* ** Finalize an iterator allocated with sqlite3changeset_start(). ** @@ -1073,5 +1236,436 @@ int sqlite3changeset_invert( return SQLITE_OK; } +static void sessionUpdateDeleteWhere( + SessionBuffer *pBuf, /* Buffer to append to */ + int nCol, /* Number of entries in azCol and abPK */ + const char **azCol, /* Column names */ + u8 *abPK, /* True for PK columns */ + int *pRc /* IN/OUT: Error code */ +){ + if( *pRc==SQLITE_OK ){ + int i; + const char *zSep = ""; + + sessionAppendStr(pBuf, " WHERE ", pRc); + + for(i=0; ipConflict = pSelect; + res = xConflict(pCtx, SQLITE_CHANGESET_DATA, pIter); + pIter->pConflict = 0; + sqlite3_reset(pSelect); + }else{ + rc = sqlite3_reset(pSelect); + if( rc==SQLITE_OK ){ + res = xConflict(pCtx, SQLITE_CHANGESET_NOTFOUND, pIter); + } + } + + }else if( rc==SQLITE_CONSTRAINT ){ + res = xConflict(pCtx, SQLITE_CHANGESET_CONSTRAINT, pIter); + rc = SQLITE_OK; + } + + if( rc!=SQLITE_OK ) break; + + }else if( op==SQLITE_UPDATE ){ + int i; + int res; + rc = sessionUpdateRow(db, zTab, nCol, azCol, abPK, &pUpdate); + for(i=0; rc==SQLITE_OK && ipConflict = pSelect; + res = xConflict(pCtx, SQLITE_CHANGESET_DATA, pIter); + pIter->pConflict = 0; + sqlite3_reset(pSelect); + }else{ + rc = sqlite3_reset(pSelect); + if( rc==SQLITE_OK ){ + res = xConflict(pCtx, SQLITE_CHANGESET_NOTFOUND, pIter); + } + } + }else if( rc==SQLITE_CONSTRAINT ){ + assert(0); + } + + }else{ + int i; + assert( op==SQLITE_INSERT ); + if( pInsert==0 ){ + SessionBuffer buf = {0, 0, 0}; + sessionAppendStr(&buf, "INSERT INTO main.", &rc); + sessionAppendIdent(&buf, zTab, &rc); + sessionAppendStr(&buf, " VALUES(?", &rc); + for(i=1; ipConflict = pSelect; + res = xConflict(pCtx, SQLITE_CHANGESET_CONFLICT, pIter); + pIter->pConflict = 0; + sqlite3_reset(pSelect); + }else{ + rc = sqlite3_reset(pSelect); + if( rc==SQLITE_OK ){ + res = xConflict(pCtx, SQLITE_CHANGESET_CONSTRAINT, pIter); + } + } + } + } + } + rc2 = sqlite3changeset_finalize(pIter); + if( rc==SQLITE_DONE ) rc = rc2; + + if( rc==SQLITE_OK ){ + rc = sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0); + }else{ + sqlite3_exec(db, "ROLLBACK TO changeset_apply", 0, 0, 0); + sqlite3_exec(db, "RELEASE changeset_apply", 0, 0, 0); + } + + sqlite3_finalize(pInsert); + sqlite3_finalize(pDelete); + sqlite3_finalize(pUpdate); + sqlite3_finalize(pSelect); + sqlite3_free(azCol); + return rc; +} #endif /* #ifdef SQLITE_ENABLE_SESSION */ diff --git a/ext/session/sqlite3session.h b/ext/session/sqlite3session.h index fe6ca2075b..8c3a708ce8 100644 --- a/ext/session/sqlite3session.h +++ b/ext/session/sqlite3session.h @@ -112,6 +112,21 @@ int sqlite3changeset_new( int iVal, sqlite3_value **ppValue /* OUT: New value (or NULL pointer) */ ); +/* +** This function is only usable with sqlite3_changeset_iter objects passed +** to the xConflict callback by sqlite3changeset_apply(). It cannot be used +** with iterators created using sqlite3changeset_start(). +** +** It is used to access the "conflicting row" information available to the +** conflict handler if the second argument is either SQLITE_CHANGESET_DATA +** or SQLITE_CHANGESET_CONFLICT. +*/ +int sqlite3changeset_conflict( + sqlite3_changeset_iter *pIter, + int iVal, + sqlite3_value **ppValue /* OUT: Value from conflicting row */ +); + /* ** Finalize an iterator allocated with sqlite3changeset_start(). @@ -129,6 +144,36 @@ int sqlite3changeset_invert( int *pnOut, void **ppOut /* OUT: Inverse of input */ ); +/* +** Apply a changeset to a database. +** +** It is safe to execute SQL statements, including those that write to the +** table that the callback related to, from within the xConflict callback. +** This can be used to further customize the applications conflict +** resolution strategy. +*/ +int sqlite3changeset_apply( + sqlite3 *db, + int nChangeset, + void *pChangeset, + int(*xConflict)( + void *pCtx, /* Copy of fifth arg to _apply() */ + int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */ + sqlite3_changeset_iter *p /* Handle describing change and conflict */ + ), + void *pCtx +); + +/* Values passed as the second argument to a conflict-handler */ +#define SQLITE_CHANGESET_DATA 1 +#define SQLITE_CHANGESET_NOTFOUND 2 +#define SQLITE_CHANGESET_CONFLICT 3 +#define SQLITE_CHANGESET_CONSTRAINT 4 + +/* Valid return values from a conflict-handler */ +#define SQLITE_CHANGESET_OMIT 0 +#define SQLITE_CHANGESET_REPLACE 1 +#define SQLITE_CHANGESET_ABORT 2 #endif diff --git a/ext/session/test_session.c b/ext/session/test_session.c index 1e8a9eccf6..a805280787 100644 --- a/ext/session/test_session.c +++ b/ext/session/test_session.c @@ -168,6 +168,139 @@ static void test_append_value(Tcl_Obj *pList, sqlite3_value *pVal){ } } +typedef struct TestConflictHandler TestConflictHandler; +struct TestConflictHandler { + Tcl_Interp *interp; + Tcl_Obj *pScript; +}; + +static int test_conflict_handler( + void *pCtx, /* Pointer to TestConflictHandler structure */ + int eConf, /* DATA, MISSING, CONFLICT, CONSTRAINT */ + sqlite3_changeset_iter *pIter /* Handle describing change and conflict */ +){ + TestConflictHandler *p = (TestConflictHandler *)pCtx; + Tcl_Obj *pEval; + Tcl_Interp *interp = p->interp; + + int op; /* SQLITE_UPDATE, DELETE or INSERT */ + const char *zTab; /* Name of table conflict is on */ + int nCol; /* Number of columns in table zTab */ + + pEval = Tcl_DuplicateObj(p->pScript); + Tcl_IncrRefCount(pEval); + + sqlite3changeset_op(pIter, &zTab, &nCol, &op); + + /* Append the operation type. */ + Tcl_ListObjAppendElement(0, pEval, Tcl_NewStringObj( + op==SQLITE_INSERT ? "INSERT" : + op==SQLITE_UPDATE ? "UPDATE" : + "DELETE", -1 + )); + + /* Append the table name. */ + Tcl_ListObjAppendElement(0, pEval, Tcl_NewStringObj(zTab, -1)); + + /* Append the conflict type. */ + switch( eConf ){ + case SQLITE_CHANGESET_DATA: + Tcl_ListObjAppendElement(interp, pEval,Tcl_NewStringObj("DATA",-1)); + break; + case SQLITE_CHANGESET_NOTFOUND: + Tcl_ListObjAppendElement(interp, pEval,Tcl_NewStringObj("NOTFOUND",-1)); + break; + case SQLITE_CHANGESET_CONFLICT: + Tcl_ListObjAppendElement(interp, pEval,Tcl_NewStringObj("CONFLICT",-1)); + break; + case SQLITE_CHANGESET_CONSTRAINT: + Tcl_ListObjAppendElement(interp, pEval,Tcl_NewStringObj("CONSTRAINT",-1)); + break; + } + + /* If this is not an INSERT, append the old row */ + if( op!=SQLITE_INSERT ){ + int i; + Tcl_Obj *pOld = Tcl_NewObj(); + for(i=0; i