1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-27 20:41:58 +03:00

Merge the sessions-diff branch with this one.

FossilOrigin-Name: 805baa57e5d2e97dccddc08eb72f2564df4802e8
This commit is contained in:
dan
2015-04-11 17:09:36 +00:00
6 changed files with 612 additions and 65 deletions

View File

@ -25,6 +25,15 @@ typedef struct SessionInput SessionInput;
# endif
#endif
typedef struct SessionHook SessionHook;
struct SessionHook {
void *pCtx;
int (*xOld)(void*,int,sqlite3_value**);
int (*xNew)(void*,int,sqlite3_value**);
int (*xCount)(void*);
int (*xDepth)(void*);
};
/*
** Session handle structure.
*/
@ -39,6 +48,7 @@ struct sqlite3_session {
int (*xTableFilter)(void *pCtx, const char *zTab);
sqlite3_session *pNext; /* Next session object on same db. */
SessionTable *pTable; /* List of attached tables */
SessionHook hook; /* APIs to grab new and old data with */
};
/*
@ -438,7 +448,7 @@ static unsigned int sessionHashAppendType(unsigned int h, int eType){
** and the output variables are set as described above.
*/
static int sessionPreupdateHash(
sqlite3 *db, /* Database handle */
sqlite3_session *pSession, /* Session object that owns pTab */
SessionTable *pTab, /* Session table handle */
int bNew, /* True to hash the new.* PK */
int *piHash, /* OUT: Hash value */
@ -448,7 +458,7 @@ static int sessionPreupdateHash(
int i; /* Used to iterate through columns */
assert( *pbNullPK==0 );
assert( pTab->nCol==sqlite3_preupdate_count(db) );
assert( pTab->nCol==pSession->hook.xCount(pSession->hook.pCtx) );
for(i=0; i<pTab->nCol; i++){
if( pTab->abPK[i] ){
int rc;
@ -456,9 +466,9 @@ static int sessionPreupdateHash(
sqlite3_value *pVal;
if( bNew ){
rc = sqlite3_preupdate_new(db, i, &pVal);
rc = pSession->hook.xNew(pSession->hook.pCtx, i, &pVal);
}else{
rc = sqlite3_preupdate_old(db, i, &pVal);
rc = pSession->hook.xOld(pSession->hook.pCtx, i, &pVal);
}
if( rc!=SQLITE_OK ) return rc;
@ -764,11 +774,12 @@ static int sessionMergeUpdate(
** false.
*/
static int sessionPreupdateEqual(
sqlite3 *db, /* Database handle */
sqlite3_session *pSession, /* Session object that owns SessionTable */
SessionTable *pTab, /* Table associated with change */
SessionChange *pChange, /* Change to compare to */
int op /* Current pre-update operation */
){
sqlite3 *db = pSession->db;
int iCol; /* Used to iterate through columns */
u8 *a = pChange->aRecord; /* Cursor used to scan change record */
@ -787,11 +798,11 @@ static int sessionPreupdateEqual(
** within sessionPreupdateHash(). The first two asserts below verify
** this (that the method has already been called). */
if( op==SQLITE_INSERT ){
assert( db->pPreUpdate->pNewUnpacked || db->pPreUpdate->aNew );
rc = sqlite3_preupdate_new(db, iCol, &pVal);
/* assert( db->pPreUpdate->pNewUnpacked || db->pPreUpdate->aNew ); */
rc = pSession->hook.xNew(pSession->hook.pCtx, iCol, &pVal);
}else{
assert( db->pPreUpdate->pUnpacked );
rc = sqlite3_preupdate_old(db, iCol, &pVal);
/* assert( db->pPreUpdate->pUnpacked ); */
rc = pSession->hook.xOld(pSession->hook.pCtx, iCol, &pVal);
}
assert( rc==SQLITE_OK );
if( sqlite3_value_type(pVal)!=eType ) return 0;
@ -1019,7 +1030,7 @@ static int sessionInitTable(sqlite3_session *pSession, SessionTable *pTab){
);
}
if( pSession->rc==SQLITE_OK
&& pTab->nCol!=sqlite3_preupdate_count(pSession->db)
&& pTab->nCol!=pSession->hook.xCount(pSession->hook.pCtx)
){
pSession->rc = SQLITE_SCHEMA;
}
@ -1039,9 +1050,8 @@ static void sessionPreupdateOneChange(
sqlite3_session *pSession, /* Session object pTab is attached to */
SessionTable *pTab /* Table that change applies to */
){
sqlite3 *db = pSession->db;
int iHash;
int bNullPk = 0;
int bNull = 0;
int rc = SQLITE_OK;
if( pSession->rc ) return;
@ -1058,14 +1068,14 @@ static void sessionPreupdateOneChange(
/* Calculate the hash-key for this change. If the primary key of the row
** includes a NULL value, exit early. Such changes are ignored by the
** session module. */
rc = sessionPreupdateHash(db, pTab, op==SQLITE_INSERT, &iHash, &bNullPk);
rc = sessionPreupdateHash(pSession, pTab, op==SQLITE_INSERT, &iHash, &bNull);
if( rc!=SQLITE_OK ) goto error_out;
if( bNullPk==0 ){
if( bNull==0 ){
/* Search the hash table for an existing record for this row. */
SessionChange *pC;
for(pC=pTab->apChange[iHash]; pC; pC=pC->pNext){
if( sessionPreupdateEqual(db, pTab, pC, op) ) break;
if( sessionPreupdateEqual(pSession, pTab, pC, op) ) break;
}
if( pC==0 ){
@ -1084,10 +1094,10 @@ static void sessionPreupdateOneChange(
for(i=0; i<pTab->nCol; i++){
sqlite3_value *p = 0;
if( op!=SQLITE_INSERT ){
TESTONLY(int trc = ) sqlite3_preupdate_old(pSession->db, i, &p);
TESTONLY(int trc = ) pSession->hook.xOld(pSession->hook.pCtx, i, &p);
assert( trc==SQLITE_OK );
}else if( pTab->abPK[i] ){
TESTONLY(int trc = ) sqlite3_preupdate_new(pSession->db, i, &p);
TESTONLY(int trc = ) pSession->hook.xNew(pSession->hook.pCtx, i, &p);
assert( trc==SQLITE_OK );
}
@ -1115,15 +1125,15 @@ static void sessionPreupdateOneChange(
for(i=0; i<pTab->nCol; i++){
sqlite3_value *p = 0;
if( op!=SQLITE_INSERT ){
sqlite3_preupdate_old(pSession->db, i, &p);
pSession->hook.xOld(pSession->hook.pCtx, i, &p);
}else if( pTab->abPK[i] ){
sqlite3_preupdate_new(pSession->db, i, &p);
pSession->hook.xNew(pSession->hook.pCtx, i, &p);
}
sessionSerializeValue(&pChange->aRecord[nByte], p, &nByte);
}
/* Add the change to the hash-table */
if( pSession->bIndirect || sqlite3_preupdate_depth(pSession->db) ){
if( pSession->bIndirect || pSession->hook.xDepth(pSession->hook.pCtx) ){
pChange->bIndirect = 1;
}
pChange->nRecord = nByte;
@ -1134,7 +1144,9 @@ static void sessionPreupdateOneChange(
}else if( pC->bIndirect ){
/* If the existing change is considered "indirect", but this current
** change is "direct", mark the change object as direct. */
if( sqlite3_preupdate_depth(pSession->db)==0 && pSession->bIndirect==0 ){
if( pSession->hook.xDepth(pSession->hook.pCtx)==0
&& pSession->bIndirect==0
){
pC->bIndirect = 0;
}
}
@ -1147,6 +1159,39 @@ static void sessionPreupdateOneChange(
}
}
static int sessionFindTable(
sqlite3_session *pSession,
const char *zName,
SessionTable **ppTab
){
int rc = SQLITE_OK;
int nName = sqlite3Strlen30(zName);
SessionTable *pRet;
/* Search for an existing table */
for(pRet=pSession->pTable; pRet; pRet=pRet->pNext){
if( 0==sqlite3_strnicmp(pRet->zName, zName, nName+1) ) break;
}
if( pRet==0 && pSession->bAutoAttach ){
/* If there is a table-filter configured, invoke it. If it returns 0,
** do not automatically add the new table. */
if( pSession->xTableFilter==0
|| pSession->xTableFilter(pSession->pFilterCtx, zName)
){
rc = sqlite3session_attach(pSession, zName);
if( rc==SQLITE_OK ){
pRet = pSession->pTable;
assert( 0==sqlite3_strnicmp(pRet->zName, zName, nName+1) );
}
}
}
assert( rc==SQLITE_OK || pRet==0 );
*ppTab = pRet;
return rc;
}
/*
** The 'pre-update' hook registered by this module with SQLite databases.
*/
@ -1161,7 +1206,6 @@ static void xPreUpdate(
){
sqlite3_session *pSession;
int nDb = sqlite3Strlen30(zDb);
int nName = sqlite3Strlen30(zName);
assert( sqlite3_mutex_held(db->mutex) );
@ -1175,37 +1219,312 @@ static void xPreUpdate(
if( pSession->rc ) continue;
if( sqlite3_strnicmp(zDb, pSession->zDb, nDb+1) ) continue;
for(pTab=pSession->pTable; pTab || pSession->bAutoAttach; pTab=pTab->pNext){
if( !pTab ){
/* This branch is taken if table zName has not yet been attached to
** this session and the auto-attach flag is set. */
/* If there is a table-filter configured, invoke it. If it returns 0,
** this change will not be recorded. Break out of the loop early in
** this case. */
if( pSession->xTableFilter
&& pSession->xTableFilter(pSession->pFilterCtx, zName)==0
){
break;
}
pSession->rc = sqlite3session_attach(pSession,zName);
if( pSession->rc ) break;
pTab = pSession->pTable;
assert( 0==sqlite3_strnicmp(pTab->zName, zName, nName+1) );
}
if( 0==sqlite3_strnicmp(pTab->zName, zName, nName+1) ){
sessionPreupdateOneChange(op, pSession, pTab);
if( op==SQLITE_UPDATE ){
sessionPreupdateOneChange(SQLITE_INSERT, pSession, pTab);
}
break;
pSession->rc = sessionFindTable(pSession, zName, &pTab);
if( pTab ){
assert( pSession->rc==SQLITE_OK );
sessionPreupdateOneChange(op, pSession, pTab);
if( op==SQLITE_UPDATE ){
sessionPreupdateOneChange(SQLITE_INSERT, pSession, pTab);
}
}
}
}
/*
** The pre-update hook implementations.
*/
static int sessionPreupdateOld(void *pCtx, int iVal, sqlite3_value **ppVal){
return sqlite3_preupdate_old((sqlite3*)pCtx, iVal, ppVal);
}
static int sessionPreupdateNew(void *pCtx, int iVal, sqlite3_value **ppVal){
return sqlite3_preupdate_new((sqlite3*)pCtx, iVal, ppVal);
}
static int sessionPreupdateCount(void *pCtx){
return sqlite3_preupdate_count((sqlite3*)pCtx);
}
static int sessionPreupdateDepth(void *pCtx){
return sqlite3_preupdate_depth((sqlite3*)pCtx);
}
/*
** Install the pre-update hooks on the session object passed as the only
** argument.
*/
static void sessionPreupdateHooks(
sqlite3_session *pSession
){
pSession->hook.pCtx = (void*)pSession->db;
pSession->hook.xOld = sessionPreupdateOld;
pSession->hook.xNew = sessionPreupdateNew;
pSession->hook.xCount = sessionPreupdateCount;
pSession->hook.xDepth = sessionPreupdateDepth;
}
typedef struct SessionDiffCtx SessionDiffCtx;
struct SessionDiffCtx {
sqlite3_stmt *pStmt;
int nOldOff;
};
/*
** The diff hook implementations.
*/
static int sessionDiffOld(void *pCtx, int iVal, sqlite3_value **ppVal){
SessionDiffCtx *p = (SessionDiffCtx*)pCtx;
*ppVal = sqlite3_column_value(p->pStmt, iVal+p->nOldOff);
return SQLITE_OK;
}
static int sessionDiffNew(void *pCtx, int iVal, sqlite3_value **ppVal){
SessionDiffCtx *p = (SessionDiffCtx*)pCtx;
*ppVal = sqlite3_column_value(p->pStmt, iVal);
return SQLITE_OK;
}
static int sessionDiffCount(void *pCtx){
SessionDiffCtx *p = (SessionDiffCtx*)pCtx;
return p->nOldOff ? p->nOldOff : sqlite3_column_count(p->pStmt);
}
static int sessionDiffDepth(void *pCtx){
return 0;
}
/*
** Install the diff hooks on the session object passed as the only
** argument.
*/
static void sessionDiffHooks(
sqlite3_session *pSession,
SessionDiffCtx *pDiffCtx
){
pSession->hook.pCtx = (void*)pDiffCtx;
pSession->hook.xOld = sessionDiffOld;
pSession->hook.xNew = sessionDiffNew;
pSession->hook.xCount = sessionDiffCount;
pSession->hook.xDepth = sessionDiffDepth;
}
static char *sessionExprComparePK(
int nCol,
const char *zDb1, const char *zDb2,
const char *zTab,
const char **azCol, u8 *abPK
){
int i;
const char *zSep = "";
char *zRet = 0;
for(i=0; i<nCol; i++){
if( abPK[i] ){
zRet = sqlite3_mprintf("%z%s\"%w\".\"%w\".\"%w\"=\"%w\".\"%w\".\"%w\"",
zRet, zSep, zDb1, zTab, azCol[i], zDb2, zTab, azCol[i]
);
zSep = " AND ";
if( zRet==0 ) break;
}
}
return zRet;
}
static char *sessionExprCompareOther(
int nCol,
const char *zDb1, const char *zDb2,
const char *zTab,
const char **azCol, u8 *abPK
){
int i;
const char *zSep = "";
char *zRet = 0;
int bHave = 0;
for(i=0; i<nCol; i++){
if( abPK[i]==0 ){
bHave = 1;
zRet = sqlite3_mprintf(
"%z%s\"%w\".\"%w\".\"%w\" IS NOT \"%w\".\"%w\".\"%w\"",
zRet, zSep, zDb1, zTab, azCol[i], zDb2, zTab, azCol[i]
);
zSep = " OR ";
if( zRet==0 ) break;
}
}
if( bHave==0 ){
assert( zRet==0 );
zRet = sqlite3_mprintf("0");
}
return zRet;
}
static char *sessionSelectFindNew(
int nCol,
const char *zDb1, /* Pick rows in this db only */
const char *zDb2, /* But not in this one */
const char *zTbl, /* Table name */
const char *zExpr
){
char *zRet = sqlite3_mprintf(
"SELECT * FROM \"%w\".\"%w\" WHERE NOT EXISTS ("
" SELECT 1 FROM \"%w\".\"%w\" WHERE %s"
")",
zDb1, zTbl, zDb2, zTbl, zExpr
);
return zRet;
}
static int sessionDiffFindNew(
int op,
sqlite3_session *pSession,
SessionTable *pTab,
const char *zDb1,
const char *zDb2,
char *zExpr
){
int rc = SQLITE_OK;
char *zStmt = sessionSelectFindNew(pTab->nCol, zDb1, zDb2, pTab->zName,zExpr);
if( zStmt==0 ){
rc = SQLITE_NOMEM;
}else{
sqlite3_stmt *pStmt;
rc = sqlite3_prepare(pSession->db, zStmt, -1, &pStmt, 0);
if( rc==SQLITE_OK ){
SessionDiffCtx *pDiffCtx = (SessionDiffCtx*)pSession->hook.pCtx;
pDiffCtx->pStmt = pStmt;
pDiffCtx->nOldOff = 0;
while( SQLITE_ROW==sqlite3_step(pStmt) ){
sessionPreupdateOneChange(op, pSession, pTab);
}
rc = sqlite3_finalize(pStmt);
}
sqlite3_free(zStmt);
}
return rc;
}
static int sessionDiffFindModified(
sqlite3_session *pSession,
SessionTable *pTab,
const char *zFrom,
const char *zExpr
){
int rc = SQLITE_OK;
char *zExpr2 = sessionExprCompareOther(pTab->nCol,
pSession->zDb, zFrom, pTab->zName, pTab->azCol, pTab->abPK
);
if( zExpr2==0 ){
rc = SQLITE_NOMEM;
}else{
char *zStmt = sqlite3_mprintf(
"SELECT * FROM \"%w\".\"%w\", \"%w\".\"\%w\" WHERE %s AND %z",
pSession->zDb, pTab->zName, zFrom, pTab->zName, zExpr, zExpr2
);
if( zStmt==0 ){
rc = SQLITE_NOMEM;
}else{
sqlite3_stmt *pStmt;
rc = sqlite3_prepare(pSession->db, zStmt, -1, &pStmt, 0);
if( rc==SQLITE_OK ){
SessionDiffCtx *pDiffCtx = (SessionDiffCtx*)pSession->hook.pCtx;
pDiffCtx->pStmt = pStmt;
pDiffCtx->nOldOff = pTab->nCol;
while( SQLITE_ROW==sqlite3_step(pStmt) ){
sessionPreupdateOneChange(SQLITE_UPDATE, pSession, pTab);
}
rc = sqlite3_finalize(pStmt);
}
sqlite3_free(zStmt);
}
}
return rc;
}
int sqlite3session_diff(
sqlite3_session *pSession,
const char *zFrom,
const char *zTbl,
char **pzErrMsg
){
const char *zDb = pSession->zDb;
int rc = pSession->rc;
SessionDiffCtx d;
memset(&d, 0, sizeof(d));
sessionDiffHooks(pSession, &d);
if( pzErrMsg ) *pzErrMsg = 0;
if( rc==SQLITE_OK ){
char *zExpr = 0;
sqlite3 *db = pSession->db;
SessionTable *pTo; /* Table zTbl */
/* Locate and if necessary initialize the target table object */
rc = sessionFindTable(pSession, zTbl, &pTo);
if( pTo==0 ) goto diff_out;
if( pTo->nCol==0 ){
rc = pSession->rc = sessionTableInfo(db, zDb,
pTo->zName, &pTo->nCol, 0, &pTo->azCol, &pTo->abPK
);
}
/* Check the table schemas match */
if( rc==SQLITE_OK ){
int nCol; /* Columns in zFrom.zTbl */
u8 *abPK;
const char **azCol = 0;
rc = sessionTableInfo(db, zFrom, zTbl, &nCol, 0, &azCol, &abPK);
if( rc==SQLITE_OK ){
int bMismatch = 0;
if( pTo->nCol!=nCol || memcmp(pTo->abPK, abPK, nCol) ){
bMismatch = 1;
}else{
int i;
for(i=0; i<nCol; i++){
if( sqlite3_stricmp(azCol[i], pTo->azCol[i]) ) bMismatch = 1;
}
}
if( bMismatch ){
*pzErrMsg = sqlite3_mprintf("table schemas do not match");
rc = SQLITE_ERROR;
}
}
sqlite3_free(azCol);
}
if( rc==SQLITE_OK ){
zExpr = sessionExprComparePK(pTo->nCol,
zDb, zFrom, pTo->zName, pTo->azCol, pTo->abPK
);
}
/* Find new rows */
if( rc==SQLITE_OK ){
rc = sessionDiffFindNew(SQLITE_INSERT, pSession, pTo, zDb, zFrom, zExpr);
}
/* Find old rows */
if( rc==SQLITE_OK ){
rc = sessionDiffFindNew(SQLITE_DELETE, pSession, pTo, zFrom, zDb, zExpr);
}
/* Find modified rows */
if( rc==SQLITE_OK ){
rc = sessionDiffFindModified(pSession, pTo, zFrom, zExpr);
}
sqlite3_free(zExpr);
}
diff_out:
sessionPreupdateHooks(pSession);
return rc;
}
/*
** Create a session object. This session object will record changes to
** database zDb attached to connection db.
@ -1230,6 +1549,7 @@ int sqlite3session_create(
pNew->zDb = (char *)&pNew[1];
pNew->bEnable = 1;
memcpy(pNew->zDb, zDb, nDb+1);
sessionPreupdateHooks(pNew);
/* Add the new session object to the linked list of session objects
** attached to database handle $db. Do this under the cover of the db