diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c new file mode 100644 index 0000000000..eb024775fd --- /dev/null +++ b/ext/session/sqlite3session.c @@ -0,0 +1,993 @@ + +#ifdef SQLITE_ENABLE_SESSION + +#include "sqlite3session.h" +#include +#include + +#include "sqliteInt.h" +#include "vdbeInt.h" + +typedef struct RowChange RowChange; +typedef struct SessionTable SessionTable; +typedef struct SessionChange SessionChange; + +#if 0 +#ifndef SQLITE_AMALGAMATION +typedef unsigned char u8; +typedef unsigned long u32; +typedef sqlite3_uint64 u64; +#endif +#endif + +struct sqlite3_session { + sqlite3 *db; /* Database handle session is attached to */ + char *zDb; /* Name of database session is attached to */ + int rc; /* Non-zero if an error has occurred */ + sqlite3_session *pNext; /* Next session object on same db. */ + SessionTable *pTable; /* List of attached tables */ +}; + +/* +** Each session object maintains a set of the following structures, one +** for each table the session object is monitoring. The structures are +** stored in a linked list starting at sqlite3_session.pTable. +** +** The keys of the SessionTable.aChange[] hash table are all rows that have +** been modified in any way since the session object was attached to the +** table. +** +** The data associated with each hash-table entry is a structure containing +** a subset of the initial values that the modified row contained at the +** start of the session. Or no initial values if the row was inserted. +*/ +struct SessionTable { + SessionTable *pNext; + char *zName; /* Local name of table */ + int nCol; /* Number of columns in table zName */ + + /* Hash table of modified rows */ + int nEntry; /* NUmber of entries in hash table */ + int nChange; /* Size of apChange[] array */ + SessionChange **apChange; /* Hash table buckets */ +}; + +/* +** RECORD FORMAT: +** +** The following record format is similar to (but not compatible with) that +** used in SQLite database files. This format is used as part of the +** change-set binary format, and so must be architecture independent. +** +** Unlike the SQLite database record format, each field is self-contained - +** there is no separation of header and data. Each field begins with a +** single byte describing its type, as follows: +** +** 0x00: Undefined value. +** 0x01: Integer value. +** 0x02: Real value. +** 0x03: Text value. +** 0x04: Blob value. +** 0x05: SQL NULL value. +** +** Note that the above match the definitions of SQLITE_INTEGER, SQLITE_TEXT +** and so on in sqlite3.h. For undefined and NULL values, the field consists +** only of the single type byte. For other types of values, the type byte +** is followed by: +** +** Text values: +** A varint containing the number of bytes in the value (encoded using +** UTF-8). Followed by a buffer containing the UTF-8 representation +** of the text value. There is no nul terminator. +** +** Blob values: +** A varint containing the number of bytes in the value, followed by +** a buffer containing the value itself. +** +** Integer values: +** An 8-byte big-endian integer value. +** +** Real values: +** An 8-byte big-endian IEEE 754-2008 real value. +** +** Varint values are encoded in the same way as varints in the SQLite +** record format. +** +** CHANGESET FORMAT: +** +** A changeset is a collection of DELETE, UPDATE and INSERT operations on +** one or more tables. Operations on a single table are grouped together, +** but may occur in any order (i.e. deletes, updates and inserts are all +** mixed together). +** +** Each group of changes begins with a table header: +** +** 1 byte: Constant 0x54 (capital 'T') +** Varint: Big-endian integer set to the number of columns in the table. +** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated. +** +** Followed by one or more changes to the table. +** +** 1 byte: Either SQLITE_INSERT, UPDATE or DELETE. +** old.* record: (delete and update only) +** new.* record: (insert and update only) +*/ + +/* +** For each row modified during a session, there exists a single instance of +** this structure stored in a SessionTable.aChange[] hash table. +*/ +struct SessionChange { + sqlite3_int64 iKey; /* Key value */ + int nRecord; /* Number of bytes in buffer aRecord[] */ + u8 *aRecord; /* Buffer containing old.* record */ + SessionChange *pNext; /* For hash-table collisions */ +}; + + +static int sessionVarintPut(u8 *aBuf, u32 iVal){ + if( (iVal & ~0x7F)==0 ){ + if( aBuf ){ + aBuf[0] = (u8)iVal; + } + return 1; + } + if( (iVal & ~0x3FFF)==0 ){ + if( aBuf ){ + aBuf[0] = ((iVal >> 7) & 0x7F) | 0x80; + aBuf[1] = iVal & 0x7F; + } + return 2; + } + if( aBuf ){ + aBuf[0] = ((iVal >> 28) & 0x7F) | 0x80; + aBuf[1] = ((iVal >> 21) & 0x7F) | 0x80; + aBuf[2] = ((iVal >> 14) & 0x7F) | 0x80; + aBuf[3] = ((iVal >> 7) & 0x7F) | 0x80; + aBuf[4] = iVal & 0x7F; + } + return 5; +} + +static int sessionVarintGet(u8 *aBuf, int *piVal){ + int ret; + u64 v; + ret = (int)sqlite3GetVarint(aBuf, &v); + *piVal = (int)v; + return ret; +} + +static sqlite3_int64 sessionGetI64(u8 *aRec){ + return (((sqlite3_int64)aRec[0]) << 56) + + (((sqlite3_int64)aRec[1]) << 48) + + (((sqlite3_int64)aRec[2]) << 40) + + (((sqlite3_int64)aRec[3]) << 32) + + (((sqlite3_int64)aRec[4]) << 24) + + (((sqlite3_int64)aRec[5]) << 16) + + (((sqlite3_int64)aRec[6]) << 8) + + (((sqlite3_int64)aRec[7]) << 0); +} + +/* +** This function is used to serialize the contents of value pValue (see +** comment titled "RECORD FORMAT" above). +** +** If it is non-NULL, the serialized form of the value is written to +** buffer aBuf. *pnWrite is set to the number of bytes written before +** returning. Or, if aBuf is NULL, the only thing this function does is +** set *pnWrite. +** +** If no error occurs, SQLITE_OK is returned. Or, if an OOM error occurs +** within a call to sqlite3_value_text() (may fail if the db is utf-16)) +** SQLITE_NOMEM is returned. +*/ +static int sessionSerializeValue( + u8 *aBuf, /* If non-NULL, write serialized value here */ + sqlite3_value *pValue, /* Value to serialize */ + int *pnWrite /* IN/OUT: Increment by bytes written */ +){ + int eType; + int nByte; + + eType = sqlite3_value_type(pValue); + if( aBuf ) aBuf[0] = eType; + + switch( eType ){ + case SQLITE_NULL: + nByte = 1; + break; + + case SQLITE_INTEGER: + case SQLITE_FLOAT: + if( aBuf ){ + /* TODO: SQLite does something special to deal with mixed-endian + ** floating point values (e.g. ARM7). This code probably should + ** too. */ + u64 i; + if( eType==SQLITE_INTEGER ){ + i = (u64)sqlite3_value_int64(pValue); + }else{ + double r; + assert( sizeof(double)==8 && sizeof(u64)==8 ); + r = sqlite3_value_double(pValue); + memcpy(&i, &r, 8); + } + aBuf[1] = (i>>56) & 0xFF; + aBuf[2] = (i>>48) & 0xFF; + aBuf[3] = (i>>40) & 0xFF; + aBuf[4] = (i>>32) & 0xFF; + aBuf[5] = (i>>24) & 0xFF; + aBuf[6] = (i>>16) & 0xFF; + aBuf[7] = (i>> 8) & 0xFF; + aBuf[8] = (i>> 0) & 0xFF; + } + nByte = 9; + break; + + case SQLITE_TEXT: + case SQLITE_BLOB: { + int n = sqlite3_value_bytes(pValue); + int nVarint = sessionVarintPut(0, n); + if( aBuf ){ + sessionVarintPut(&aBuf[1], n); + memcpy(&aBuf[nVarint + 1], eType==SQLITE_TEXT ? + sqlite3_value_text(pValue) : sqlite3_value_blob(pValue), n + ); + } + + nByte = 1 + nVarint + n; + break; + } + } + + *pnWrite += nByte; + return SQLITE_OK; +} + +/* +** Return the hash of iKey, assuming there are nBucket hash buckets in +** the hash table. +*/ +static int sessionKeyhash(int nBucket, sqlite3_int64 iKey){ + return (iKey % nBucket); +} + +/* +** If required, grow the hash table used to store changes on table pTab +** (part of the session pSession). If a fatal OOM error occurs, set the +** session object to failed and return SQLITE_ERROR. Otherwise, return +** SQLITE_OK. +** +** It is possible that a non-fatal OOM error occurs in this function. In +** that case the hash-table does not grow, but SQLITE_OK is returned anyway. +** Growing the hash table in this case is a performance optimization only, +** it is not required for correct operation. +*/ +static int sessionGrowHash(sqlite3_session *pSession, SessionTable *pTab){ + if( pTab->nChange==0 || pTab->nEntry>=(pTab->nChange/2) ){ + int i; + SessionChange **apNew; + int nNew = (pTab->nChange ? pTab->nChange : 128) * 2; + + apNew = (SessionChange **)sqlite3_malloc(sizeof(SessionChange *) * nNew); + if( apNew==0 ){ + if( pTab->nChange==0 ){ + pSession->rc = SQLITE_NOMEM; + return SQLITE_ERROR; + } + return SQLITE_OK; + } + memset(apNew, 0, sizeof(SessionChange *) * nNew); + + for(i=0; inChange; i++){ + SessionChange *p; + SessionChange *pNext; + for(p=pTab->apChange[i]; p; p=pNext){ + int iHash = sessionKeyhash(nNew, p->iKey); + pNext = p->pNext; + p->pNext = apNew[iHash]; + apNew[iHash] = p; + } + } + + sqlite3_free(pTab->apChange); + pTab->nChange = nNew; + pTab->apChange = apNew; + } + + return SQLITE_OK; +} + +static int sessionInitTable(sqlite3_session *pSession, SessionTable *pTab){ + if( pTab->nCol==0 ){ + pTab->nCol = sqlite3_preupdate_count(pSession->db); + } + + if( pTab->nCol!=sqlite3_preupdate_count(pSession->db) ){ + pSession->rc = SQLITE_SCHEMA; + return SQLITE_ERROR; + } + + return SQLITE_OK; +} + +/* +** The 'pre-update' hook registered by this module with SQLite databases. +*/ +static void xPreUpdate( + void *pCtx, /* Copy of third arg to preupdate_hook() */ + sqlite3 *db, /* Database handle */ + int op, /* SQLITE_UPDATE, DELETE or INSERT */ + char const *zDb, /* Database name */ + char const *zName, /* Table name */ + sqlite3_int64 iKey1, /* Rowid of row about to be deleted/updated */ + sqlite3_int64 iKey2 /* New rowid value (for a rowid UPDATE) */ +){ + sqlite3_session *pSession; + int nDb = strlen(zDb); + int nName = strlen(zDb); + + for(pSession=(sqlite3_session *)pCtx; pSession; pSession=pSession->pNext){ + SessionTable *pTab; + if( pSession->rc ) continue; + if( sqlite3_strnicmp(zDb, pSession->zDb, nDb+1) ) continue; + for(pTab=pSession->pTable; pTab; pTab=pTab->pNext){ + if( 0==sqlite3_strnicmp(pTab->zName, zName, nName+1) ){ + SessionChange *pChange; + SessionChange *pC; + int iHash; + int rc = SQLITE_OK; + + /* Load table details if required */ + if( sessionInitTable(pSession, pTab) ) return; + + /* Grow the hash table if required */ + if( sessionGrowHash(pSession, pTab) ) return; + + /* Search the hash table for an existing entry for rowid=iKey2. If + ** one is found, store a pointer to it in pChange and unlink it from + ** the hash table. Otherwise, set pChange to NULL. + */ + iHash = sessionKeyhash(pTab->nChange, iKey2); + for(pC=pTab->apChange[iHash]; pC; pC=pC->pNext){ + if( pC->iKey==iKey2 ) break; + } + if( pC ) continue; + + pTab->nEntry++; + + /* Create a new change object containing all the old values (if + ** this is an SQLITE_UPDATE or SQLITE_DELETE), or no record at + ** all (if this is an INSERT). */ + if( op==SQLITE_INSERT ){ + pChange = (SessionChange *)sqlite3_malloc(sizeof(SessionChange)); + if( pChange ){ + memset(pChange, 0, sizeof(SessionChange)); + } + }else{ + int nByte; /* Number of bytes to allocate */ + int i; /* Used to iterate through columns */ + sqlite3_value *pValue; + + /* Figure out how large an allocation is required */ + nByte = sizeof(SessionChange); + for(i=0; inCol && rc==SQLITE_OK; i++){ + rc = sqlite3_preupdate_old(pSession->db, i, &pValue); + if( rc==SQLITE_OK ){ + rc = sessionSerializeValue(0, pValue, &nByte); + } + } + + /* Allocate the change object */ + pChange = (SessionChange *)sqlite3_malloc(nByte); + if( !pChange ){ + rc = SQLITE_NOMEM; + }else{ + memset(pChange, 0, sizeof(SessionChange)); + pChange->aRecord = (u8 *)&pChange[1]; + } + + /* Populate the change object */ + nByte = 0; + for(i=0; inCol && rc==SQLITE_OK; i++){ + rc = sqlite3_preupdate_old(pSession->db, i, &pValue); + if( rc==SQLITE_OK ){ + rc = sessionSerializeValue( + &pChange->aRecord[nByte], pValue, &nByte); + } + } + pChange->nRecord = nByte; + } + + /* If an error has occurred, mark the session object as failed. */ + if( rc!=SQLITE_OK ){ + sqlite3_free(pChange); + pSession->rc = rc; + return; + } + + /* Add the change back to the hash-table */ + pChange->iKey = iKey2; + pChange->pNext = pTab->apChange[iHash]; + pTab->apChange[iHash] = pChange; + } + break; + } + } +} + +/* +** Create a session object. This session object will record changes to +** database zDb attached to connection db. +*/ +int sqlite3session_create( + sqlite3 *db, /* Database handle */ + const char *zDb, /* Name of db (e.g. "main") */ + sqlite3_session **ppSession /* OUT: New session object */ +){ + sqlite3_session *pNew; + sqlite3_session *pOld; + int nDb = strlen(zDb); /* Length of zDb in bytes */ + + *ppSession = 0; + + /* Allocate and populate the new session object. */ + pNew = (sqlite3_session *)sqlite3_malloc(sizeof(sqlite3_session) + nDb + 1); + if( !pNew ) return SQLITE_NOMEM; + memset(pNew, 0, sizeof(sqlite3_session)); + pNew->db = db; + pNew->zDb = (char *)&pNew[1]; + memcpy(pNew->zDb, zDb, nDb+1); + + /* Add the new session object to the linked list of session objects + ** attached to database handle $db. Do this under the cover of the db + ** handle mutex. */ + sqlite3_mutex_enter(sqlite3_db_mutex(db)); + pOld = (sqlite3_session*)sqlite3_preupdate_hook(db, xPreUpdate, (void*)pNew); + pNew->pNext = pOld; + sqlite3_mutex_leave(sqlite3_db_mutex(db)); + + *ppSession = pNew; + return SQLITE_OK; +} + +/* +** Delete a session object previously allocated using sqlite3session_create(). +*/ +void sqlite3session_delete(sqlite3_session *pSession){ + sqlite3 *db = pSession->db; + sqlite3_session *pHead; + sqlite3_session **pp; + + sqlite3_mutex_enter(sqlite3_db_mutex(db)); + pHead = (sqlite3_session*)sqlite3_preupdate_hook(db, 0, 0); + for(pp=&pHead; (*pp)!=pSession; pp=&((*pp)->pNext)); + *pp = (*pp)->pNext; + if( pHead ) sqlite3_preupdate_hook(db, xPreUpdate, (void *)pHead); + sqlite3_mutex_leave(sqlite3_db_mutex(db)); + + while( pSession->pTable ){ + int i; + SessionTable *pTab = pSession->pTable; + pSession->pTable = pTab->pNext; + for(i=0; inChange; i++){ + SessionChange *p; + SessionChange *pNext; + for(p=pTab->apChange[i]; p; p=pNext){ + pNext = p->pNext; + sqlite3_free(p); + } + } + sqlite3_free(pTab->apChange); + sqlite3_free(pTab); + } + + sqlite3_free(pSession); +} + +/* +** Attach a table to a session. All subsequent changes made to the table +** while the session object is enabled will be recorded. +** +** Only tables that have a PRIMARY KEY defined may be attached. It does +** not matter if the PRIMARY KEY is an "INTEGER PRIMARY KEY" (rowid alias) +** or not. +*/ +int sqlite3session_attach( + sqlite3_session *pSession, /* Session object */ + const char *zName /* Table name */ +){ + SessionTable *pTab; + int nName; + + /* First search for an existing entry. If one is found, this call is + ** a no-op. Return early. */ + nName = strlen(zName); + for(pTab=pSession->pTable; pTab; pTab=pTab->pNext){ + if( 0==sqlite3_strnicmp(pTab->zName, zName, nName+1) ){ + return SQLITE_OK; + } + } + + /* Allocate new SessionTable object. */ + pTab = (SessionTable *)sqlite3_malloc(sizeof(SessionTable) + nName + 1); + if( !pTab ) return SQLITE_NOMEM; + + /* Populate the new SessionTable object and link it into the list. */ + memset(pTab, 0, sizeof(SessionTable)); + pTab->zName = (char *)&pTab[1]; + memcpy(pTab->zName, zName, nName+1); + pTab->pNext = pSession->pTable; + pSession->pTable = pTab; + + return SQLITE_OK; +} + +typedef struct SessionBuffer SessionBuffer; +struct SessionBuffer { + u8 *aBuf; /* Pointer to changeset buffer */ + int nBuf; /* Size of buffer aBuf */ + int nAlloc; /* Size of allocation containing aBuf */ +}; + +static int sessionBufferGrow(SessionBuffer *p, int nByte, int *pRc){ + if( p->nAlloc-p->nBufnAlloc ? p->nAlloc : 128; + do { + nNew = nNew*2; + }while( nNew<(p->nAlloc+nByte) ); + + aNew = (u8 *)sqlite3_realloc(p->aBuf, nNew); + if( 0==aNew ){ + *pRc = SQLITE_NOMEM; + return 1; + } + p->aBuf = aNew; + p->nAlloc = nNew; + } + return 0; +} + +static void sessionAppendByte(SessionBuffer *p, u8 v, int *pRc){ + if( *pRc==SQLITE_OK && 0==sessionBufferGrow(p, 1, pRc) ){ + p->aBuf[p->nBuf++] = v; + } +} + +static void sessionAppendVarint(SessionBuffer *p, sqlite3_int64 v, int *pRc){ + if( *pRc==SQLITE_OK && 0==sessionBufferGrow(p, 9, pRc) ){ + p->nBuf += sessionVarintPut(&p->aBuf[p->nBuf], v); + } +} + +static void sessionAppendBlob( + SessionBuffer *p, + const u8 *aBlob, + int nBlob, + int *pRc +){ + if( *pRc==SQLITE_OK && 0==sessionBufferGrow(p, nBlob, pRc) ){ + memcpy(&p->aBuf[p->nBuf], aBlob, nBlob); + p->nBuf += nBlob; + } +} + +static void sessionAppendCol( + SessionBuffer *p, + sqlite3_stmt *pStmt, + int iCol, + int *pRc +){ + if( *pRc==SQLITE_OK ){ + int eType = sqlite3_column_type(pStmt, iCol); + sessionAppendByte(p, (u8)eType, pRc); + if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ + sqlite3_int64 i; + u8 aBuf[8]; + if( eType==SQLITE_INTEGER ){ + i = sqlite3_column_int64(pStmt, iCol); + }else{ + double r = sqlite3_column_double(pStmt, iCol); + memcpy(&i, &r, 8); + } + aBuf[0] = (i>>56) & 0xFF; + aBuf[1] = (i>>48) & 0xFF; + aBuf[2] = (i>>40) & 0xFF; + aBuf[3] = (i>>32) & 0xFF; + aBuf[4] = (i>>24) & 0xFF; + aBuf[5] = (i>>16) & 0xFF; + aBuf[6] = (i>> 8) & 0xFF; + aBuf[7] = (i>> 0) & 0xFF; + sessionAppendBlob(p, aBuf, 8, pRc); + } + if( eType==SQLITE_BLOB || eType==SQLITE_TEXT ){ + int nByte = sqlite3_column_bytes(pStmt, iCol); + sessionAppendVarint(p, nByte, pRc); + sessionAppendBlob(p, eType==SQLITE_BLOB ? + sqlite3_column_blob(pStmt, iCol) : sqlite3_column_text(pStmt, iCol), + nByte, pRc + ); + } + } +} + +static void sessionAppendUpdate( + sqlite3_stmt *pStmt, + SessionBuffer *pBuf, + SessionChange *p, + int *pRc +){ + if( *pRc==SQLITE_OK ){ + SessionBuffer buf2 = {0, 0, 0}; + int bNoop = 1; + int i; + u8 *pCsr = p->aRecord; + sessionAppendByte(pBuf, SQLITE_UPDATE, pRc); + for(i=0; inBuf -= (1 + sqlite3_column_count(pStmt)); + }else{ + sessionAppendBlob(pBuf, buf2.aBuf, buf2.nBuf, pRc); + sqlite3_free(buf2.aBuf); + } + } + + +} + +/* +** Obtain a changeset object containing all changes recorded by the +** session object passed as the first argument. +** +** It is the responsibility of the caller to eventually free the buffer +** using sqlite3_free(). +*/ +int sqlite3session_changeset( + sqlite3_session *pSession, /* Session object */ + int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ + void **ppChangeset /* OUT: Buffer containing changeset */ +){ + sqlite3 *db = pSession->db; + SessionTable *pTab; + SessionBuffer buf = {0, 0, 0}; + int rc; + + *pnChangeset = 0; + *ppChangeset = 0; + rc = pSession->rc; + + for(pTab=pSession->pTable; rc==SQLITE_OK && pTab; pTab=pTab->pNext){ + if( pTab->nEntry ){ + int i; + sqlite3_stmt *pStmt = 0; + int bNoop = 1; + int nRewind = buf.nBuf; + + /* Write a table header */ + sessionAppendByte(&buf, 'T', &rc); + sessionAppendVarint(&buf, pTab->nCol, &rc); + sessionAppendBlob(&buf, (u8 *)pTab->zName, strlen(pTab->zName)+1, &rc); + + /* Build and compile a statement to execute: */ + if( rc==SQLITE_OK ){ + char *zSql = sqlite3_mprintf("SELECT * FROM %Q.%Q WHERE _rowid_ = ?", + pSession->zDb, pTab->zName + ); + if( !zSql ){ + rc = SQLITE_NOMEM; + }else{ + rc = sqlite3_prepare_v2(db, zSql, -1, &pStmt, 0); + } + sqlite3_free(zSql); + } + + if( rc==SQLITE_OK && pTab->nCol!=sqlite3_column_count(pStmt) ){ + rc = SQLITE_SCHEMA; + } + + for(i=0; inChange; i++){ + SessionChange *p; + for(p=pTab->apChange[i]; rc==SQLITE_OK && p; p=p->pNext){ + sqlite3_bind_int64(pStmt, 1, p->iKey); + if( sqlite3_step(pStmt)==SQLITE_ROW ){ + int iCol; + if( p->aRecord ){ + sessionAppendUpdate(pStmt, &buf, p, &rc); + }else{ + sessionAppendByte(&buf, SQLITE_INSERT, &rc); + for(iCol=0; iColnCol; iCol++){ + sessionAppendCol(&buf, pStmt, iCol, &rc); + } + } + bNoop = 0; + }else if( p->aRecord ){ + /* A DELETE change */ + sessionAppendByte(&buf, SQLITE_DELETE, &rc); + sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc); + bNoop = 0; + } + rc = sqlite3_reset(pStmt); + } + } + + sqlite3_finalize(pStmt); + + if( bNoop ){ + buf.nBuf = nRewind; + } + } + } + + if( rc==SQLITE_OK ){ + *pnChangeset = buf.nBuf; + *ppChangeset = buf.aBuf; + }else{ + sqlite3_free(buf.aBuf); + } + + return rc; +} + +int sqlite3session_enable(sqlite3_session *pSession, int bEnable){ + return bEnable; +} + +/************************************************************************/ +/************************************************************************/ +/************************************************************************/ + +struct sqlite3_changeset_iter { + u8 *aChangeset; /* Pointer to buffer containing changeset */ + int nChangeset; /* Number of bytes in aChangeset */ + u8 *pNext; /* Pointer to next change within aChangeset */ + int rc; + + char *zTab; /* Current table */ + int nCol; /* Number of columns in zTab */ + int op; /* Current operation */ + sqlite3_value **apValue; /* old.* and new.* values */ +}; + +/* +** Create an iterator used to iterate through the contents of a changeset. +*/ +int sqlite3changeset_start( + sqlite3_changeset_iter **ppIter, + int nChangeset, + void *pChangeset +){ + sqlite3_changeset_iter *pRet; /* Iterator to return */ + int nByte; /* Number of bytes to allocate for iterator */ + + *ppIter = 0; + + nByte = sizeof(sqlite3_changeset_iter); + pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte); + if( !pRet ) return SQLITE_NOMEM; + memset(pRet, 0, sizeof(sqlite3_changeset_iter)); + + pRet->aChangeset = (u8 *)pChangeset; + pRet->nChangeset = nChangeset; + pRet->pNext = pRet->aChangeset; + + *ppIter = pRet; + return SQLITE_OK; +} + +static int sessionReadRecord( + u8 **paChange, /* IN/OUT: Pointer to binary record */ + int nCol, /* Number of values in record */ + sqlite3_value **apOut /* Write values to this array */ +){ + int i; + u8 *aRec = *paChange; + + for(i=0; irc!=SQLITE_OK ) return p->rc; + + if( p->apValue ){ + for(i=0; inCol*2; i++){ + sqlite3ValueFree(p->apValue[i]); + } + memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2); + } + + /* If the iterator is already at the end of the changeset, return DONE. */ + if( p->pNext>=&p->aChangeset[p->nChangeset] ){ + return SQLITE_DONE; + } + aChange = p->pNext; + + c = *(aChange++); + if( c=='T' ){ + int nByte; /* Bytes to allocate for apValue */ + aChange += sessionVarintGet(aChange, &p->nCol); + p->zTab = (char *)aChange; + aChange += (strlen((char *)aChange) + 1); + p->op = *(aChange++); + sqlite3_free(p->apValue); + nByte = sizeof(sqlite3_value *) * p->nCol * 2; + p->apValue = (sqlite3_value **)sqlite3_malloc(nByte); + if( !p->apValue ){ + return (p->rc = SQLITE_NOMEM); + } + memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2); + }else{ + p->op = c; + } + if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ + return (p->rc = SQLITE_CORRUPT); + } + + /* If this is an UPDATE or DELETE, read the old.* record. */ + if( p->op!=SQLITE_INSERT ){ + p->rc = sessionReadRecord(&aChange, p->nCol, p->apValue); + if( p->rc!=SQLITE_OK ) return p->rc; + } + + /* If this is an INSERT or UPDATE, read the new.* record. */ + if( p->op!=SQLITE_DELETE ){ + p->rc = sessionReadRecord(&aChange, p->nCol, &p->apValue[p->nCol]); + if( p->rc!=SQLITE_OK ) return p->rc; + } + + p->pNext = aChange; + return SQLITE_ROW; +} + +/* +** The following three functions extract information on the current change +** from a changeset iterator. They may only be called after changeset_next() +** has returned SQLITE_ROW. +*/ +int sqlite3changeset_op( + sqlite3_changeset_iter *pIter, + const char **pzTab, /* OUT: Pointer to table name */ + int *pnCol, /* OUT: Number of columns in table */ + int *pOp /* OUT: SQLITE_INSERT, DELETE or UPDATE */ +){ + *pOp = pIter->op; + *pnCol = pIter->nCol; + *pzTab = pIter->zTab; + return SQLITE_OK; +} + +int sqlite3changeset_old( + sqlite3_changeset_iter *pIter, + int iVal, + sqlite3_value **ppValue /* OUT: Old value (or NULL pointer) */ +){ + if( iVal<0 || iVal>=pIter->nCol ){ + return SQLITE_RANGE; + } + *ppValue = pIter->apValue[iVal]; + return SQLITE_OK; +} + +int sqlite3changeset_new( + sqlite3_changeset_iter *pIter, + int iVal, + sqlite3_value **ppValue /* OUT: New value (or NULL pointer) */ +){ + if( iVal<0 || iVal>=pIter->nCol ){ + return SQLITE_RANGE; + } + *ppValue = pIter->apValue[pIter->nCol+iVal]; + return SQLITE_OK; +} + +/* +** Finalize an iterator allocated with sqlite3changeset_start(). +** +** This function may not be called on iterators passed to a conflict handler +** callback by changeset_apply(). +*/ +int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ + int i; + int rc = p->rc; + for(i=0; inCol*2; i++) sqlite3ValueFree(p->apValue[i]); + sqlite3_free(p->apValue); + sqlite3_free(p); + return rc; +} + +#endif /* #ifdef SQLITE_ENABLE_SESSION */ diff --git a/ext/session/sqlite3session.h b/ext/session/sqlite3session.h new file mode 100644 index 0000000000..a62b8b654e --- /dev/null +++ b/ext/session/sqlite3session.h @@ -0,0 +1,125 @@ + +#ifndef __SQLITESESSION_H_ +#define __SQLITESESSION_H_ 1 + +/* +** Make sure we can call this stuff from C++. +*/ +#ifdef __cplusplus +extern "C" { +#endif + +#include "sqlite3.h" + +typedef struct sqlite3_session sqlite3_session; +typedef struct sqlite3_changeset_iter sqlite3_changeset_iter; + +/* +** Create a session object. This session object will record changes to +** database zDb attached to connection db. +*/ +int sqlite3session_create( + sqlite3 *db, /* Database handle */ + const char *zDb, /* Name of db (e.g. "main") */ + sqlite3_session **ppSession /* OUT: New session object */ +); + +/* +** Enable or disable the recording of changes by a session object. When +** enabled, a session object records changes made to the database. When +** disabled - it does not. A newly created session object is enabled. +** +** Passing zero to this function disables the session. Passing a value +** greater than zero enables it. Passing a value less than zero is a +** no-op, and may be used to query the current state of the session. +** +** The return value indicates the final state of the session object: 0 if +** the session is disabled, or 1 if it is enabled. +*/ +int sqlite3session_enable(sqlite3_session *pSession, int bEnable); + +/* +** Attach a table to a session. All subsequent changes made to the table +** while the session object is enabled will be recorded. +** +** Only tables that have a PRIMARY KEY defined may be attached. It does +** not matter if the PRIMARY KEY is an "INTEGER PRIMARY KEY" (rowid alias) +** or not. +*/ +int sqlite3session_attach( + sqlite3_session *pSession, /* Session object */ + const char *zTab /* Table name */ +); + +/* +** Obtain a changeset object containing all changes recorded by the +** session object passed as the first argument. +** +** It is the responsibility of the caller to eventually free the buffer +** using sqlite3_free(). +*/ +int sqlite3session_changeset( + sqlite3_session *pSession, /* Session object */ + int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ + void **ppChangeset /* OUT: Buffer containing changeset */ +); + +/* +** Delete a session object previously allocated using sqlite3session_create(). +*/ +void sqlite3session_delete(sqlite3_session *pSession); + + +/* +** Create an iterator used to iterate through the contents of a changeset. +*/ +int sqlite3changeset_start( + sqlite3_changeset_iter **ppIter, + int nChangeset, + void *pChangeset +); + +/* +** Advance an iterator created by sqlite3changeset_start() to the next +** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE +** or SQLITE_CORRUPT. +** +** This function may not be called on iterators passed to a conflict handler +** callback by changeset_apply(). +*/ +int sqlite3changeset_next(sqlite3_changeset_iter *pIter); + +/* +** The following three functions extract information on the current change +** from a changeset iterator. They may only be called after changeset_next() +** has returned SQLITE_ROW. +*/ +int sqlite3changeset_op( + sqlite3_changeset_iter *pIter, /* Iterator object */ + const char **pzTab, /* OUT: Pointer to table name */ + int *pnCol, /* OUT: Number of columns in table */ + int *pOp /* OUT: SQLITE_INSERT, DELETE or UPDATE */ +); + +int sqlite3changeset_old( + sqlite3_changeset_iter *pIter, + int iVal, + sqlite3_value **ppValue /* OUT: Old value (or NULL pointer) */ +); + +int sqlite3changeset_new( + sqlite3_changeset_iter *pIter, + int iVal, + sqlite3_value **ppValue /* OUT: New value (or NULL pointer) */ +); + +/* +** Finalize an iterator allocated with sqlite3changeset_start(). +** +** This function may not be called on iterators passed to a conflict handler +** callback by changeset_apply(). +*/ +int sqlite3changeset_finalize(sqlite3_changeset_iter *pIter); + +#endif + diff --git a/ext/session/test_session.c b/ext/session/test_session.c new file mode 100644 index 0000000000..9c37fdb285 --- /dev/null +++ b/ext/session/test_session.c @@ -0,0 +1,258 @@ + +#if defined(SQLITE_TEST) && defined(SQLITE_ENABLE_SESSION) + +#include "sqlite3session.h" +#include +#include +#include + +static int test_session_error(Tcl_Interp *interp, int rc){ + extern const char *sqlite3TestErrorName(int); + Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3TestErrorName(rc), -1)); + return TCL_ERROR; +} + +/* +** Tclcmd: $session attach TABLE +** $session changeset +** $session delete +** $session enable BOOL +*/ +static int test_session_cmd( + void *clientData, + Tcl_Interp *interp, + int objc, + Tcl_Obj *CONST objv[] +){ + sqlite3_session *pSession = (sqlite3_session *)clientData; + struct SessionSubcmd { + const char *zSub; + int nArg; + const char *zMsg; + int iSub; + } aSub[] = { + { "attach", 1, "TABLE", }, /* 0 */ + { "changeset", 0, "", }, /* 1 */ + { "delete", 0, "", }, /* 2 */ + { "enable", 1, "", }, /* 3 */ + { 0 } + }; + int iSub; + int rc; + + if( objc<2 ){ + Tcl_WrongNumArgs(interp, 1, objv, "SUBCOMMAND ..."); + return TCL_ERROR; + } + rc = Tcl_GetIndexFromObjStruct(interp, + objv[1], aSub, sizeof(aSub[0]), "sub-command", 0, &iSub + ); + if( rc!=TCL_OK ) return rc; + if( objc!=2+aSub[iSub].nArg ){ + Tcl_WrongNumArgs(interp, 2, objv, aSub[iSub].zMsg); + return TCL_ERROR; + } + + switch( iSub ){ + case 0: /* attach */ + rc = sqlite3session_attach(pSession, Tcl_GetString(objv[2])); + if( rc!=SQLITE_OK ){ + return test_session_error(interp, rc); + } + break; + + case 1: { /* changeset */ + int nChange; + void *pChange; + rc = sqlite3session_changeset(pSession, &nChange, &pChange); + if( rc==SQLITE_OK ){ + Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(pChange, nChange)); + sqlite3_free(pChange); + }else{ + return test_session_error(interp, rc); + } + break; + } + + case 2: /* delete */ + Tcl_DeleteCommand(interp, Tcl_GetString(objv[0])); + break; + + case 3: { /* enable */ + int val; + if( Tcl_GetBooleanFromObj(interp, objv[2], &val) ) return TCL_ERROR; + val = sqlite3session_enable(pSession, val); + Tcl_SetObjResult(interp, Tcl_NewBooleanObj(val)); + break; + } + } + + return TCL_OK; +} + +static void test_session_del(void *clientData){ + sqlite3_session *pSession = (sqlite3_session *)clientData; + sqlite3session_delete(pSession); +} + +/* +** Tclcmd: sqlite3session CMD DB-HANDLE DB-NAME +*/ +static int test_sqlite3session( + void * clientData, + Tcl_Interp *interp, + int objc, + Tcl_Obj *CONST objv[] +){ + sqlite3 *db; + Tcl_CmdInfo info; + int rc; /* sqlite3session_create() return code */ + sqlite3_session *pSession; /* New session object */ + + if( objc!=4 ){ + Tcl_WrongNumArgs(interp, 1, objv, "CMD DB-HANDLE DB-NAME"); + return TCL_ERROR; + } + + if( 0==Tcl_GetCommandInfo(interp, Tcl_GetString(objv[2]), &info) ){ + Tcl_AppendResult(interp, "no such handle: ", Tcl_GetString(objv[2]), 0); + return TCL_ERROR; + } + db = *(sqlite3 **)info.objClientData; + + rc = sqlite3session_create(db, Tcl_GetString(objv[3]), &pSession); + if( rc!=SQLITE_OK ){ + return test_session_error(interp, rc); + } + + Tcl_CreateObjCommand( + interp, Tcl_GetString(objv[1]), test_session_cmd, (ClientData)pSession, + test_session_del + ); + Tcl_SetObjResult(interp, objv[1]); + return TCL_OK; +} + +static void test_append_value(Tcl_Obj *pList, sqlite3_value *pVal){ + if( pVal==0 ){ + Tcl_ListObjAppendElement(0, pList, Tcl_NewObj()); + Tcl_ListObjAppendElement(0, pList, Tcl_NewObj()); + }else{ + Tcl_Obj *pObj; + switch( sqlite3_value_type(pVal) ){ + case SQLITE_NULL: + Tcl_ListObjAppendElement(0, pList, Tcl_NewStringObj("n", 1)); + pObj = Tcl_NewObj(); + break; + case SQLITE_INTEGER: + Tcl_ListObjAppendElement(0, pList, Tcl_NewStringObj("i", 1)); + pObj = Tcl_NewWideIntObj(sqlite3_value_int64(pVal)); + break; + case SQLITE_FLOAT: + Tcl_ListObjAppendElement(0, pList, Tcl_NewStringObj("f", 1)); + pObj = Tcl_NewDoubleObj(sqlite3_value_double(pVal)); + break; + case SQLITE_TEXT: + Tcl_ListObjAppendElement(0, pList, Tcl_NewStringObj("t", 1)); + pObj = Tcl_NewStringObj((char *)sqlite3_value_text(pVal), -1); + break; + case SQLITE_BLOB: + Tcl_ListObjAppendElement(0, pList, Tcl_NewStringObj("b", 1)); + pObj = Tcl_NewByteArrayObj( + sqlite3_value_blob(pVal), + sqlite3_value_bytes(pVal) + ); + break; + } + Tcl_ListObjAppendElement(0, pList, pObj); + } +} + +/* +** sqlite3session_foreach VARNAME CHANGESET SCRIPT +*/ +static int test_sqlite3session_foreach( + void * clientData, + Tcl_Interp *interp, + int objc, + Tcl_Obj *CONST objv[] +){ + void *pChangeSet; + int nChangeSet; + sqlite3_changeset_iter *pIter; + int rc; + + if( objc!=4 ){ + Tcl_WrongNumArgs(interp, 1, objv, "VARNAME CHANGESET SCRIPT"); + return TCL_ERROR; + } + + pChangeSet = (void *)Tcl_GetByteArrayFromObj(objv[2], &nChangeSet); + rc = sqlite3changeset_start(&pIter, nChangeSet, pChangeSet); + if( rc!=SQLITE_OK ){ + return test_session_error(interp, rc); + } + + while( SQLITE_ROW==sqlite3changeset_next(pIter) ){ + int nCol; /* Number of columns in table */ + int op; /* SQLITE_INSERT, UPDATE or DELETE */ + const char *zTab; /* Name of table change applies to */ + Tcl_Obj *pVar; /* Tcl value to set $VARNAME to */ + Tcl_Obj *pOld; /* Vector of old.* values */ + Tcl_Obj *pNew; /* Vector of new.* values */ + + sqlite3changeset_op(pIter, &zTab, &nCol, &op); + pVar = Tcl_NewObj(); + Tcl_ListObjAppendElement(0, pVar, Tcl_NewStringObj( + op==SQLITE_INSERT ? "INSERT" : + op==SQLITE_UPDATE ? "UPDATE" : + "DELETE", -1 + )); + Tcl_ListObjAppendElement(0, pVar, Tcl_NewStringObj(zTab, -1)); + + pOld = Tcl_NewObj(); + if( op!=SQLITE_INSERT ){ + int i; + for(i=0; ipUnpacked==0 ){ - KeyInfo keyinfo; u32 nRecord; u8 *aRecord; - memset(&keyinfo, 0, sizeof(KeyInfo)); - keyinfo.db = db; - keyinfo.enc = ENC(db); - keyinfo.nField = p->pCsr->nField; - rc = sqlite3BtreeDataSize(p->pCsr->pCursor, &nRecord); if( rc!=SQLITE_OK ) goto preupdate_old_out; aRecord = sqlite3DbMallocRaw(db, nRecord); @@ -1370,7 +1364,7 @@ int sqlite3_preupdate_old(sqlite3 *db, int iIdx, sqlite3_value **ppValue){ goto preupdate_old_out; } - p->pUnpacked = sqlite3VdbeRecordUnpack(&keyinfo, nRecord, aRecord, 0, 0); + p->pUnpacked = sqlite3VdbeRecordUnpack(&p->keyinfo, nRecord, aRecord, 0, 0); p->aRecord = aRecord; } diff --git a/src/vdbeaux.c b/src/vdbeaux.c index 937c2a1998..cb6aefac08 100644 --- a/src/vdbeaux.c +++ b/src/vdbeaux.c @@ -3194,6 +3194,9 @@ void sqlite3VdbePreUpdateHook( preupdate.pCsr = pCsr; preupdate.op = op; + preupdate.keyinfo.db = db; + preupdate.keyinfo.enc = ENC(db); + preupdate.keyinfo.nField = pCsr->nField; db->pPreUpdate = &preupdate; db->xPreUpdateCallback(db->pPreUpdateArg, db, op, zDb, zTbl, iKey1, iKey2); db->pPreUpdate = 0; diff --git a/test/session1.test b/test/session1.test new file mode 100644 index 0000000000..0cb225a2fd --- /dev/null +++ b/test/session1.test @@ -0,0 +1,118 @@ +# 2011 March 07 +# +# The author disclaims copyright to this source code. In place of +# a legal notice, here is a blessing: +# +# May you do good and not evil. +# May you find forgiveness for yourself and forgive others. +# May you share freely, never taking more than you give. +# +#*********************************************************************** +# This file implements regression tests for SQLite library. +# + +set testdir [file dirname $argv0] +source $testdir/tester.tcl +set ::testprefix session1 + +do_execsql_test 1.0 { + CREATE TABLE t1(x, y); + INSERT INTO t1 VALUES('abc', 'def'); +} + +do_test 1.1 { sqlite3session S db main } {S} +do_test 1.2 { S delete } {} +do_test 1.3 { sqlite3session S db main } {S} +do_test 1.4 { S attach t1 } {} +do_test 1.5 { S delete } {} +do_test 1.6 { sqlite3session S db main } {S} +do_test 1.7 { S attach t1 ; S attach t2 ; S attach t3 } {} +do_test 1.8 { S attach t1 ; S attach t2 ; S attach t3 } {} +do_test 1.9 { S delete } {} +do_test 1.10 { + sqlite3session S db main + S attach t1 + execsql { INSERT INTO t1 VALUES('ghi', 'jkl') } +} {} +do_test 1.11 { S delete } {} +do_test 1.12 { + sqlite3session S db main + S attach t1 + execsql { INSERT INTO t1 VALUES('mno', 'pqr') } + execsql { UPDATE t1 SET x = 111 WHERE rowid = 1 } + execsql { DELETE FROM t1 WHERE rowid = 2 } +} {} +do_test 1.13 { + S changeset + S delete +} {} + +proc do_changeset_test {tn session res} { + set r [list] + foreach x $res {lappend r $x} + uplevel do_test $tn [list [subst -nocommands { + set x [list] + sqlite3session_foreach c [$session changeset] { lappend x [set c] } + set x + }]] [list $r] +} + +do_test 2.1.1 { + execsql { DELETE FROM t1 } + sqlite3session S db main + S attach t1 + execsql { INSERT INTO t1 VALUES(1, 'Sukhothai') } + execsql { INSERT INTO t1 VALUES(2, 'Ayutthaya') } + execsql { INSERT INTO t1 VALUES(3, 'Thonburi') } +} {} +do_changeset_test 2.1.2 S { + {INSERT t1 {} {i 1 t Sukhothai}} + {INSERT t1 {} {i 2 t Ayutthaya}} + {INSERT t1 {} {i 3 t Thonburi}} +} +do_test 2.1.3 { S delete } {} + +do_test 2.2.1 { + sqlite3session S db main + S attach t1 + execsql { DELETE FROM t1 WHERE 1 } +} {} +do_changeset_test 2.2.2 S { + {DELETE t1 {i 1 t Sukhothai} {}} + {DELETE t1 {i 2 t Ayutthaya} {}} + {DELETE t1 {i 3 t Thonburi} {}} +} +do_test 2.2.3 { S delete } {} + +do_test 2.3.1 { + sqlite3session S db main + execsql { INSERT INTO t1 VALUES(1, 'Sukhothai') } + execsql { INSERT INTO t1 VALUES(2, 'Ayutthaya') } + execsql { INSERT INTO t1 VALUES(3, 'Thonburi') } + S attach t1 + execsql { + UPDATE t1 SET x = 10 WHERE x = 1; + UPDATE t1 SET y = 'Surin' WHERE x = 2; + UPDATE t1 SET x = 20, y = 'Thapae' WHERE x = 3; + } +} {} + +do_changeset_test 2.3.2 S { + {UPDATE t1 {i 1 {} {}} {i 10 {} {}}} + {UPDATE t1 {{} {} t Ayutthaya} {{} {} t Surin}} + {UPDATE t1 {i 3 t Thonburi} {i 20 t Thapae}} +} +do_test 2.3.3 { S delete } {} + +do_test 2.4.1 { + sqlite3session S db main + S attach t1 + execsql { INSERT INTO t1 VALUES(100, 'Bangkok') } + execsql { DELETE FROM t1 WHERE x = 100 } +} {} + +breakpoint +do_changeset_test 2.4.2 S {} +do_test 2.4.3 { S delete } {} + +finish_test