1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-07 02:42:48 +03:00

Cause incremental-blob read/write operations lock shared-cache tables in the same way as normal SQL read/writes. Add complex assert statements to make sure tehe correct shared-cache locks are held when accessing the database. Eliminate some redundant checks from btree.c. (CVS 6830)

FossilOrigin-Name: f17ef37897da9bcaf20b5acdce6840522c0a0b16
This commit is contained in:
danielk1977
2009-06-29 06:00:37 +00:00
parent 1b1166b7c7
commit 96d48e963a
8 changed files with 241 additions and 154 deletions

View File

@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.645 2009/06/26 16:32:13 shane Exp $
** $Id: btree.c,v 1.646 2009/06/29 06:00:37 danielk1977 Exp $
**
** This file implements a external (disk-based) database using BTrees.
** See the header comment on "btreeInt.h" for additional information.
@@ -67,11 +67,6 @@ int sqlite3_enable_shared_cache(int enable){
#endif
/*
** Forward declaration
*/
static int checkForReadConflicts(Btree*, Pgno, BtCursor*, i64);
#ifdef SQLITE_OMIT_SHARED_CACHE
/*
@@ -86,9 +81,113 @@ static int checkForReadConflicts(Btree*, Pgno, BtCursor*, i64);
#define querySharedCacheTableLock(a,b,c) SQLITE_OK
#define setSharedCacheTableLock(a,b,c) SQLITE_OK
#define clearAllSharedCacheTableLocks(a)
#define hasSharedCacheTableLock(a,b,c,d) 1
#define hasReadConflicts(a, b) 0
#endif
#ifndef SQLITE_OMIT_SHARED_CACHE
#ifdef SQLITE_DEBUG
/*
** This function is only used as part of an assert() statement. It checks
** that connection p holds the required locks to read or write to the
** b-tree with root page iRoot. If so, true is returned. Otherwise, false.
** For example, when writing to a table b-tree with root-page iRoot via
** Btree connection pBtree:
**
** assert( hasSharedCacheTableLock(pBtree, iRoot, 0, WRITE_LOCK) );
**
** When writing to an index b-tree that resides in a sharable database, the
** caller should have first obtained a lock specifying the root page of
** the corresponding table b-tree. This makes things a bit more complicated,
** as this module treats each b-tree as a separate structure. To determine
** the table b-tree corresponding to the index b-tree being written, this
** function has to search through the database schema.
**
** Instead of a lock on the b-tree rooted at page iRoot, the caller may
** hold a write-lock on the schema table (root page 1). This is also
** acceptable.
*/
static int hasSharedCacheTableLock(
Btree *pBtree, /* Handle that must hold lock */
Pgno iRoot, /* Root page of b-tree */
int isIndex, /* True if iRoot is the root of an index b-tree */
int eLockType /* Required lock type (READ_LOCK or WRITE_LOCK) */
){
Schema *pSchema = (Schema *)pBtree->pBt->pSchema;
Pgno iTab = 0;
BtLock *pLock;
/* If this b-tree database is not shareable, or if the client is reading
** and has the read-uncommitted flag set, then no lock is required.
** In these cases return true immediately. If the client is reading
** or writing an index b-tree, but the schema is not loaded, then return
** true also. In this case the lock is required, but it is too difficult
** to check if the client actually holds it. This doesn't happen very
** often. */
if( (pBtree->sharable==0)
|| (eLockType==READ_LOCK && (pBtree->db->flags & SQLITE_ReadUncommitted))
|| (isIndex && (!pSchema || (pSchema->flags&DB_SchemaLoaded)==0 ))
){
return 1;
}
/* Figure out the root-page that the lock should be held on. For table
** b-trees, this is just the root page of the b-tree being read or
** written. For index b-trees, it is the root page of the associated
** table. */
if( isIndex ){
HashElem *p;
for(p=sqliteHashFirst(&pSchema->idxHash); p; p=sqliteHashNext(p)){
Index *pIdx = (Index *)sqliteHashData(p);
if( pIdx->tnum==iRoot ){
iTab = pIdx->pTable->tnum;
}
}
}else{
iTab = iRoot;
}
/* Search for the required lock. Either a write-lock on root-page iTab, a
** write-lock on the schema table, or (if the client is reading) a
** read-lock on iTab will suffice. Return 1 if any of these are found. */
for(pLock=pBtree->pBt->pLock; pLock; pLock=pLock->pNext){
if( pLock->pBtree==pBtree
&& (pLock->iTable==iTab || (pLock->eLock==WRITE_LOCK && pLock->iTable==1))
&& pLock->eLock>=eLockType
){
return 1;
}
}
/* Failed to find the required lock. */
return 0;
}
/*
** This function is also used as part of assert() statements only. It
** returns true if there exist one or more cursors open on the table
** with root page iRoot that do not belong to either connection pBtree
** or some other connection that has the read-uncommitted flag set.
**
** For example, before writing to page iRoot:
**
** assert( !hasReadConflicts(pBtree, iRoot) );
*/
static int hasReadConflicts(Btree *pBtree, Pgno iRoot){
BtCursor *p;
for(p=pBtree->pBt->pCursor; p; p=p->pNext){
if( p->pgnoRoot==iRoot
&& p->pBtree!=pBtree
&& 0==(p->pBtree->db->flags & SQLITE_ReadUncommitted)
){
return 1;
}
}
return 0;
}
#endif /* #ifdef SQLITE_DEBUG */
/*
** Query to see if btree handle p may obtain a lock of type eLock
** (READ_LOCK or WRITE_LOCK) on the table with root-page iTab. Return
@@ -326,9 +425,41 @@ static void invalidateAllOverflowCache(BtShared *pBt){
invalidateOverflowCache(p);
}
}
/*
** This function is called before modifying the contents of a table
** b-tree to invalidate any incrblob cursors that are open on the
** row or one of the rows being modified. Argument pgnoRoot is the
** root-page of the table b-tree.
**
** If argument isClearTable is true, then the entire contents of the
** table is about to be deleted. In this case invalidate all incrblob
** cursors open on any row within the table with root-page pgnoRoot.
**
** Otherwise, if argument isClearTable is false, then the row with
** rowid iRow is being replaced or deleted. In this case invalidate
** only those incrblob cursors open on this specific row.
*/
static void invalidateIncrblobCursors(
Btree *pBtree, /* The database file to check */
Pgno pgnoRoot, /* Look for read cursors on this btree */
i64 iRow, /* The rowid that might be changing */
int isClearTable /* True if all rows are being deleted */
){
BtCursor *p;
BtShared *pBt = pBtree->pBt;
assert( sqlite3BtreeHoldsMutex(pBtree) );
for(p=pBt->pCursor; p; p=p->pNext){
if( p->isIncrblobHandle && (isClearTable || p->info.nKey==iRow) ){
p->eState = CURSOR_INVALID;
}
}
}
#else
#define invalidateOverflowCache(x)
#define invalidateAllOverflowCache(x)
#define invalidateIncrblobCursors(w,x,y,z)
#endif
/*
@@ -3055,16 +3186,22 @@ static int btreeCursor(
assert( sqlite3BtreeHoldsMutex(p) );
assert( wrFlag==0 || wrFlag==1 );
if( wrFlag ){
assert( !pBt->readOnly );
if( NEVER(pBt->readOnly) ){
return SQLITE_READONLY;
}
rc = checkForReadConflicts(p, iTable, 0, 0);
if( rc!=SQLITE_OK ){
assert( rc==SQLITE_LOCKED_SHAREDCACHE );
return rc;
}
/* The following assert statements verify that if this is a sharable b-tree
** database, the connection is holding the required table locks, and that
** no other connection has any open cursor that conflicts with this lock.
**
** The exception to this is read-only cursors open on the schema table.
** Such a cursor is opened without a lock while reading the database
** schema. This is safe because BtShared.mutex is held for the entire
** lifetime of this cursor. */
assert( (iTable==1 && wrFlag==0)
|| hasSharedCacheTableLock(p, iTable, pKeyInfo!=0, wrFlag+1)
);
assert( wrFlag==0 || !hasReadConflicts(p, iTable) );
if( NEVER(wrFlag && pBt->readOnly) ){
return SQLITE_READONLY;
}
if( pBt->pPage1==0 ){
@@ -6210,75 +6347,6 @@ static int balance(BtCursor *pCur){
return rc;
}
/*
** This routine checks all cursors that point to table pgnoRoot.
** If any of those cursors were opened with wrFlag==0 in a different
** database connection (a database connection that shares the pager
** cache with the current connection) and that other connection
** is not in the ReadUncommmitted state, then this routine returns
** SQLITE_LOCKED.
**
** As well as cursors with wrFlag==0, cursors with
** isIncrblobHandle==1 are also considered 'read' cursors because
** incremental blob cursors are used for both reading and writing.
**
** When pgnoRoot is the root page of an intkey table, this function is also
** responsible for invalidating incremental blob cursors when the table row
** on which they are opened is deleted or modified. Cursors are invalidated
** according to the following rules:
**
** 1) When BtreeClearTable() is called to completely delete the contents
** of a B-Tree table, pExclude is set to zero and parameter iRow is
** set to non-zero. In this case all incremental blob cursors open
** on the table rooted at pgnoRoot are invalidated.
**
** 2) When BtreeInsert(), BtreeDelete() or BtreePutData() is called to
** modify a table row via an SQL statement, pExclude is set to the
** write cursor used to do the modification and parameter iRow is set
** to the integer row id of the B-Tree entry being modified. Unless
** pExclude is itself an incremental blob cursor, then all incremental
** blob cursors open on row iRow of the B-Tree are invalidated.
**
** 3) If both pExclude and iRow are set to zero, no incremental blob
** cursors are invalidated.
*/
static int checkForReadConflicts(
Btree *pBtree, /* The database file to check */
Pgno pgnoRoot, /* Look for read cursors on this btree */
BtCursor *pExclude, /* Ignore this cursor */
i64 iRow /* The rowid that might be changing */
){
BtCursor *p;
BtShared *pBt = pBtree->pBt;
sqlite3 *db = pBtree->db;
assert( sqlite3BtreeHoldsMutex(pBtree) );
for(p=pBt->pCursor; p; p=p->pNext){
if( p==pExclude ) continue;
if( p->pgnoRoot!=pgnoRoot ) continue;
#ifndef SQLITE_OMIT_INCRBLOB
if( p->isIncrblobHandle && (
(!pExclude && iRow)
|| (pExclude && !pExclude->isIncrblobHandle && p->info.nKey==iRow)
)){
p->eState = CURSOR_INVALID;
}
#endif
if( p->eState!=CURSOR_VALID ) continue;
if( p->wrFlag==0
#ifndef SQLITE_OMIT_INCRBLOB
|| p->isIncrblobHandle
#endif
){
sqlite3 *dbOther = p->pBtree->db;
assert(dbOther);
if( dbOther!=db && (dbOther->flags & SQLITE_ReadUncommitted)==0 ){
sqlite3ConnectionBlocked(db, dbOther);
return SQLITE_LOCKED_SHAREDCACHE;
}
}
}
return SQLITE_OK;
}
/*
** Insert a new record into the BTree. The key is given by (pKey,nKey)
@@ -6322,12 +6390,15 @@ int sqlite3BtreeInsert(
assert( pBt->inTransaction==TRANS_WRITE );
assert( !pBt->readOnly );
assert( pCur->wrFlag );
rc = checkForReadConflicts(pCur->pBtree, pCur->pgnoRoot, pCur, nKey);
if( rc ){
/* The table pCur points to has a read lock */
assert( rc==SQLITE_LOCKED_SHAREDCACHE );
return rc;
assert( hasSharedCacheTableLock(p, pCur->pgnoRoot, pCur->pKeyInfo!=0, 2) );
/* If this is an insert into a table b-tree, invalidate any incrblob
** cursors open on the row being replaced (assuming this is a replace
** operation - if it is not, the following is a no-op). */
if( pCur->pKeyInfo==0 ){
invalidateIncrblobCursors(p, pCur->pgnoRoot, nKey, 0);
}
if( pCur->eState==CURSOR_FAULT ){
return pCur->skip;
}
@@ -6448,16 +6519,19 @@ int sqlite3BtreeDelete(BtCursor *pCur){
assert( pBt->inTransaction==TRANS_WRITE );
assert( !pBt->readOnly );
assert( pCur->wrFlag );
assert( hasSharedCacheTableLock(p, pCur->pgnoRoot, pCur->pKeyInfo!=0, 2) );
assert( !hasReadConflicts(p, pCur->pgnoRoot) );
if( NEVER(pCur->aiIdx[pCur->iPage]>=pCur->apPage[pCur->iPage]->nCell)
|| NEVER(pCur->eState!=CURSOR_VALID)
){
return SQLITE_ERROR; /* Something has gone awry. */
}
rc = checkForReadConflicts(p, pCur->pgnoRoot, pCur, pCur->info.nKey);
if( rc!=SQLITE_OK ){
assert( rc==SQLITE_LOCKED_SHAREDCACHE );
return rc; /* The table pCur points to has a read lock */
/* If this is a delete operation to remove a row from a table b-tree,
** invalidate any incrblob cursors open on the row being deleted. */
if( pCur->pKeyInfo==0 ){
invalidateIncrblobCursors(p, pCur->pgnoRoot, pCur->info.nKey, 0);
}
iCellDepth = pCur->iPage;
@@ -6757,11 +6831,13 @@ int sqlite3BtreeClearTable(Btree *p, int iTable, int *pnChange){
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
assert( p->inTrans==TRANS_WRITE );
if( (rc = checkForReadConflicts(p, iTable, 0, 1))!=SQLITE_OK ){
/* nothing to do */
}else if( SQLITE_OK!=(rc = saveAllCursors(pBt, iTable, 0)) ){
/* nothing to do */
}else{
/* Invalidate all incrblob cursors open on table iTable (assuming iTable
** is the root of a table b-tree - if it is not, the following call is
** a no-op). */
invalidateIncrblobCursors(p, iTable, 0, 1);
if( SQLITE_OK==(rc = saveAllCursors(pBt, (Pgno)iTable, 0)) ){
rc = clearDatabasePage(pBt, (Pgno)iTable, 0, pnChange);
}
sqlite3BtreeLeave(p);
@@ -7687,11 +7763,9 @@ int sqlite3BtreeLockTable(Btree *p, int iTab, u8 isWriteLock){
** to change the length of the data stored.
*/
int sqlite3BtreePutData(BtCursor *pCsr, u32 offset, u32 amt, void *z){
int rc;
assert( cursorHoldsMutex(pCsr) );
assert( sqlite3_mutex_held(pCsr->pBtree->db->mutex) );
assert(pCsr->isIncrblobHandle);
assert( pCsr->isIncrblobHandle );
restoreCursorPosition(pCsr);
assert( pCsr->eState!=CURSOR_REQUIRESEEK );
@@ -7707,14 +7781,10 @@ int sqlite3BtreePutData(BtCursor *pCsr, u32 offset, u32 amt, void *z){
if( !pCsr->wrFlag ){
return SQLITE_READONLY;
}
assert( !pCsr->pBt->readOnly
&& pCsr->pBt->inTransaction==TRANS_WRITE );
rc = checkForReadConflicts(pCsr->pBtree, pCsr->pgnoRoot, pCsr, 0);
if( rc!=SQLITE_OK ){
/* The table pCur points to has a read lock */
assert( rc==SQLITE_LOCKED_SHAREDCACHE );
return rc;
}
assert( !pCsr->pBt->readOnly && pCsr->pBt->inTransaction==TRANS_WRITE );
assert( hasSharedCacheTableLock(pCsr->pBtree, pCsr->pgnoRoot, 0, 2) );
assert( !hasReadConflicts(pCsr->pBtree, pCsr->pgnoRoot) );
if( pCsr->eState==CURSOR_INVALID || !pCsr->apPage[pCsr->iPage]->intKey ){
return SQLITE_ERROR;
}