1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-05 15:55:57 +03:00

Relax the locking requirements on BTree cursors. Any number of read and

write cursors can be open at the same time now, but a write cannot occur
as long as one or more read cursors are open.

Before this change, one or more read cursors could be open on a table,
or a single write cursor, but not both.  Both policies have the same
desirable effect: they prevent writes to a table while a sequential scan
of that table is underway.  But the new policy is a little less restrictive.
Both policies prevent an UPDATE from occurring inside a SELECT (which is
what we want) but the new policy allows a SELECT to occur inside an
UPDATE. (CVS 739)

FossilOrigin-Name: 8c2a0836980341faa479cfe6c716409e6057367d
This commit is contained in:
drh
2002-09-01 23:20:45 +00:00
parent c2eef3b32b
commit f74b8d9b89
4 changed files with 118 additions and 65 deletions

View File

@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.71 2002/08/15 13:50:49 drh Exp $
** $Id: btree.c,v 1.72 2002/09/01 23:20:45 drh Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
@@ -345,7 +345,6 @@ struct Btree {
u8 inCkpt; /* True if there is a checkpoint on the transaction */
u8 readOnly; /* True if the underlying file is readonly */
u8 needSwab; /* Need to byte-swapping */
Hash locks; /* Key: root page number. Data: lock count */
};
typedef Btree Bt;
@@ -357,6 +356,7 @@ typedef Btree Bt;
struct BtCursor {
Btree *pBt; /* The Btree to which this cursor belongs */
BtCursor *pNext, *pPrev; /* Forms a linked list of all cursors */
BtCursor *pShared; /* Loop of cursors with the same root page */
Pgno pgnoRoot; /* The root page of this tree */
MemPage *pPage; /* Page that contains the entry */
int idx; /* Index of the entry in pPage->apCell[] */
@@ -682,7 +682,6 @@ int sqliteBtreeOpen(
pBt->pCursor = 0;
pBt->page1 = 0;
pBt->readOnly = sqlitepager_isreadonly(pBt->pPager);
sqliteHashInit(&pBt->locks, SQLITE_HASH_INT, 0);
*ppBtree = pBt;
return SQLITE_OK;
}
@@ -695,7 +694,6 @@ int sqliteBtreeClose(Btree *pBt){
sqliteBtreeCloseCursor(pBt->pCursor);
}
sqlitepager_close(pBt->pPager);
sqliteHashClear(&pBt->locks);
sqliteFree(pBt);
return SQLITE_OK;
}
@@ -825,19 +823,16 @@ static int newDatabase(Btree *pBt){
int sqliteBtreeBeginTrans(Btree *pBt){
int rc;
if( pBt->inTrans ) return SQLITE_ERROR;
if( pBt->readOnly ) return SQLITE_READONLY;
if( pBt->page1==0 ){
rc = lockBtree(pBt);
if( rc!=SQLITE_OK ){
return rc;
}
}
if( pBt->readOnly ){
rc = SQLITE_OK;
}else{
rc = sqlitepager_begin(pBt->page1);
if( rc==SQLITE_OK ){
rc = newDatabase(pBt);
}
rc = sqlitepager_begin(pBt->page1);
if( rc==SQLITE_OK ){
rc = newDatabase(pBt);
}
if( rc==SQLITE_OK ){
pBt->inTrans = 1;
@@ -856,7 +851,6 @@ int sqliteBtreeBeginTrans(Btree *pBt){
*/
int sqliteBtreeCommit(Btree *pBt){
int rc;
if( pBt->inTrans==0 ) return SQLITE_ERROR;
rc = pBt->readOnly ? SQLITE_OK : sqlitepager_commit(pBt->pPager);
pBt->inTrans = 0;
pBt->inCkpt = 0;
@@ -903,7 +897,7 @@ int sqliteBtreeRollback(Btree *pBt){
int sqliteBtreeBeginCkpt(Btree *pBt){
int rc;
if( !pBt->inTrans || pBt->inCkpt ){
return SQLITE_ERROR;
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
rc = pBt->readOnly ? SQLITE_OK : sqlitepager_ckpt_begin(pBt->pPager);
pBt->inCkpt = 1;
@@ -955,20 +949,39 @@ int sqliteBtreeRollbackCkpt(Btree *pBt){
** the database file.
**
** If wrFlag==0, then the cursor can only be used for reading.
** If wrFlag==1, then the cursor can be used for reading or writing.
** A read/write cursor requires exclusive access to its table. There
** cannot be two or more cursors open on the same table if any one of
** cursors is a read/write cursor. But there can be two or more
** read-only cursors open on the same table.
** If wrFlag==1, then the cursor can be used for reading or for
** writing if other conditions for writing are also met. These
** are the conditions that must be met in order for writing to
** be allowed:
**
** 1: The cursor must have been opened with wrFlag==1
**
** 2: No other cursors may be open with wrFlag==0 on the same table
**
** 3: The database must be writable (not on read-only media)
**
** 4: There must be an active transaction.
**
** Condition 2 warrants further discussion. If any cursor is opened
** on a table with wrFlag==0, that prevents all other cursors from
** writing to that table. This is a kind of "read-lock". When a cursor
** is opened with wrFlag==0 it is guaranteed that the table will not
** change as long as the cursor is open. This allows the cursor to
** do a sequential scan of the table without having to worry about
** entries being inserted or deleted during the scan. Cursors should
** be opened with wrFlag==0 only if this read-lock property is needed.
** That is to say, cursors should be opened with wrFlag==0 only if they
** intend to use the sqliteBtreeNext() system call. All other cursors
** should be opened with wrFlag==1 even if they never really intend
** to write.
**
** No checking is done to make sure that page iTable really is the
** root page of a b-tree. If it is not, then the cursor acquired
** will not work correctly.
*/
int sqliteBtreeCursor(Btree *pBt, int iTable, int wrFlag, BtCursor **ppCur){
int rc;
BtCursor *pCur;
ptr nLock;
BtCursor *pCur, *pRing;
if( pBt->page1==0 ){
rc = lockBtree(pBt);
@@ -977,10 +990,6 @@ int sqliteBtreeCursor(Btree *pBt, int iTable, int wrFlag, BtCursor **ppCur){
return rc;
}
}
if( wrFlag && pBt->readOnly ){
*ppCur = 0;
return SQLITE_READONLY;
}
pCur = sqliteMalloc( sizeof(*pCur) );
if( pCur==0 ){
rc = SQLITE_NOMEM;
@@ -995,13 +1004,6 @@ int sqliteBtreeCursor(Btree *pBt, int iTable, int wrFlag, BtCursor **ppCur){
if( rc!=SQLITE_OK ){
goto create_cursor_exception;
}
nLock = (ptr)sqliteHashFind(&pBt->locks, 0, iTable);
if( nLock<0 || (nLock>0 && wrFlag) ){
rc = SQLITE_LOCKED;
goto create_cursor_exception;
}
nLock = wrFlag ? -1 : nLock+1;
sqliteHashInsert(&pBt->locks, 0, iTable, (void*)nLock);
pCur->pBt = pBt;
pCur->wrFlag = wrFlag;
pCur->idx = 0;
@@ -1010,6 +1012,14 @@ int sqliteBtreeCursor(Btree *pBt, int iTable, int wrFlag, BtCursor **ppCur){
pCur->pNext->pPrev = pCur;
}
pCur->pPrev = 0;
pRing = pBt->pCursor;
while( pRing && pRing->pgnoRoot!=pCur->pgnoRoot ){ pRing = pRing->pNext; }
if( pRing ){
pCur->pShared = pRing->pShared;
pRing->pShared = pCur;
}else{
pCur->pShared = pCur;
}
pBt->pCursor = pCur;
*ppCur = pCur;
return SQLITE_OK;
@@ -1029,7 +1039,6 @@ create_cursor_exception:
** when the last cursor is closed.
*/
int sqliteBtreeCloseCursor(BtCursor *pCur){
ptr nLock;
Btree *pBt = pCur->pBt;
if( pCur->pPrev ){
pCur->pPrev->pNext = pCur->pNext;
@@ -1042,11 +1051,12 @@ int sqliteBtreeCloseCursor(BtCursor *pCur){
if( pCur->pPage ){
sqlitepager_unref(pCur->pPage);
}
if( pCur->pShared!=pCur ){
BtCursor *pRing = pCur->pShared;
while( pRing->pShared!=pCur ){ pRing = pRing->pShared; }
pRing->pShared = pCur->pShared;
}
unlockBtreeIfUnused(pBt);
nLock = (ptr)sqliteHashFind(&pBt->locks, 0, pCur->pgnoRoot);
assert( nLock!=0 || sqlite_malloc_failed );
nLock = nLock<0 ? 0 : nLock-1;
sqliteHashInsert(&pBt->locks, 0, pCur->pgnoRoot, (void*)nLock);
sqliteFree(pCur);
return SQLITE_OK;
}
@@ -2388,6 +2398,35 @@ balance_cleanup:
return rc;
}
/*
** This routine checks all cursors that point to the same table
** as pCur points to. If any of those cursors were opened with
** wrFlag==0 then this routine returns SQLITE_LOCKED. If all
** cursors point to the same table were opened with wrFlag==1
** then this routine returns SQLITE_OK.
**
** In addition to checking for read-locks (where a read-lock
** means a cursor opened with wrFlag==0) this routine also moves
** all cursors other than pCur so that they are pointing to the
** first Cell on root page. This is necessary because an insert
** or delete might change the number of cells on a page or delete
** a page entirely and we do not want to leave any cursors
** pointing to non-existant pages or cells.
*/
static int checkReadLocks(BtCursor *pCur){
BtCursor *p;
assert( pCur->wrFlag );
for(p=pCur->pShared; p!=pCur; p=p->pShared){
assert( p );
assert( p->pgnoRoot==pCur->pgnoRoot );
if( p->wrFlag==0 ) return SQLITE_LOCKED;
if( sqlitepager_pagenumber(p->pPage)!=p->pgnoRoot ){
moveToRoot(p);
}
}
return SQLITE_OK;
}
/*
** Insert a new record into the BTree. The key is given by (pKey,nKey)
** and the data is given by (pData,nData). The cursor is used only to
@@ -2409,12 +2448,17 @@ int sqliteBtreeInsert(
if( pCur->pPage==0 ){
return SQLITE_ABORT; /* A rollback destroyed this cursor */
}
if( !pCur->pBt->inTrans || nKey+nData==0 ){
return SQLITE_ERROR; /* Must start a transaction first */
if( !pBt->inTrans || nKey+nData==0 ){
/* Must start a transaction before doing an insert */
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
assert( !pBt->readOnly );
if( !pCur->wrFlag ){
return SQLITE_PERM; /* Cursor not open for writing */
}
if( checkReadLocks(pCur) ){
return SQLITE_LOCKED; /* The table pCur points to has a read lock */
}
rc = sqliteBtreeMoveto(pCur, pKey, nKey, &loc);
if( rc ) return rc;
pPage = pCur->pPage;
@@ -2463,15 +2507,20 @@ int sqliteBtreeDelete(BtCursor *pCur){
if( pCur->pPage==0 ){
return SQLITE_ABORT; /* A rollback destroyed this cursor */
}
if( !pCur->pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
if( !pBt->inTrans ){
/* Must start a transaction before doing a delete */
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
assert( !pBt->readOnly );
if( pCur->idx >= pPage->nCell ){
return SQLITE_ERROR; /* The cursor is not pointing to anything */
}
if( !pCur->wrFlag ){
return SQLITE_PERM; /* Did not open this cursor for writing */
}
if( checkReadLocks(pCur) ){
return SQLITE_LOCKED; /* The table pCur points to has a read lock */
}
rc = sqlitepager_write(pPage);
if( rc ) return rc;
pCell = pPage->apCell[pCur->idx];
@@ -2538,7 +2587,8 @@ int sqliteBtreeCreateTable(Btree *pBt, int *piTable){
Pgno pgnoRoot;
int rc;
if( !pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
/* Must start a transaction first */
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
if( pBt->readOnly ){
return SQLITE_READONLY;
@@ -2610,16 +2660,15 @@ static int clearDatabasePage(Btree *pBt, Pgno pgno, int freePageFlag){
*/
int sqliteBtreeClearTable(Btree *pBt, int iTable){
int rc;
ptr nLock;
BtCursor *pCur;
if( !pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
if( pBt->readOnly ){
return SQLITE_READONLY;
}
nLock = (ptr)sqliteHashFind(&pBt->locks, 0, iTable);
if( nLock ){
return SQLITE_LOCKED;
for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
if( pCur->pgnoRoot==(Pgno)iTable ){
if( pCur->wrFlag==0 ) return SQLITE_LOCKED;
moveToRoot(pCur);
}
}
rc = clearDatabasePage(pBt, (Pgno)iTable, 0);
if( rc ){
@@ -2636,11 +2685,14 @@ int sqliteBtreeClearTable(Btree *pBt, int iTable){
int sqliteBtreeDropTable(Btree *pBt, int iTable){
int rc;
MemPage *pPage;
BtCursor *pCur;
if( !pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
if( pBt->readOnly ){
return SQLITE_READONLY;
for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
if( pCur->pgnoRoot==(Pgno)iTable ){
return SQLITE_LOCKED; /* Cannot drop a table that has a cursor */
}
}
rc = sqlitepager_get(pBt->pPager, (Pgno)iTable, (void**)&pPage);
if( rc ) return rc;
@@ -2680,10 +2732,7 @@ int sqliteBtreeUpdateMeta(Btree *pBt, int *aMeta){
PageOne *pP1;
int rc, i;
if( !pBt->inTrans ){
return SQLITE_ERROR; /* Must start a transaction first */
}
if( pBt->readOnly ){
return SQLITE_READONLY;
return pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}
pP1 = pBt->page1;
rc = sqlitepager_write(pP1);