1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-07 02:42:48 +03:00

In shared-cache mode, make sure the busy hander invoked is the

busy handler associated with the database connection that caused
the lock contention in the first place. (CVS 4598)

FossilOrigin-Name: c9eb65912f61ce0a6b66fe253652a1827e46b12a
This commit is contained in:
drh
2007-12-07 18:55:28 +00:00
parent b9175aed38
commit e5fe690d75
10 changed files with 110 additions and 85 deletions

View File

@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.431 2007/11/28 16:19:56 drh Exp $
** $Id: btree.c,v 1.432 2007/12/07 18:55:28 drh Exp $
**
** This file implements a external (disk-based) database using BTrees.
** See the header comment on "btreeInt.h" for additional information.
@@ -118,8 +118,8 @@ static int queryTableLock(Btree *p, Pgno iTab, u8 eLock){
** write-cursor does not change.
*/
if(
!p->pSqlite ||
0==(p->pSqlite->flags&SQLITE_ReadUncommitted) ||
!p->db ||
0==(p->db->flags&SQLITE_ReadUncommitted) ||
eLock==WRITE_LOCK ||
iTab==MASTER_ROOT
){
@@ -163,8 +163,8 @@ static int lockTable(Btree *p, Pgno iTable, u8 eLock){
** the ReadUncommitted flag.
*/
if(
(p->pSqlite) &&
(p->pSqlite->flags&SQLITE_ReadUncommitted) &&
(p->db) &&
(p->db->flags&SQLITE_ReadUncommitted) &&
(eLock==READ_LOCK) &&
iTable!=MASTER_ROOT
){
@@ -1098,6 +1098,16 @@ static void pageReinit(DbPage *pData, int pageSize){
}
}
/*
** Invoke the busy handler for a btree.
*/
static int sqlite3BtreeInvokeBusyHandler(void *pArg, int n){
BtShared *pBt = (BtShared*)pArg;
assert( pBt->db );
assert( sqlite3_mutex_held(pBt->db->mutex) );
return sqlite3InvokeBusyHandler(&pBt->db->busyHandler);
}
/*
** Open a database file.
**
@@ -1109,7 +1119,7 @@ static void pageReinit(DbPage *pData, int pageSize){
*/
int sqlite3BtreeOpen(
const char *zFilename, /* Name of the file containing the BTree database */
sqlite3 *pSqlite, /* Associated database handle */
sqlite3 *db, /* Associated database handle */
Btree **ppBtree, /* Pointer to new Btree object written here */
int flags, /* Options */
int vfsFlags /* Flags passed through to sqlite3_vfs.xOpen() */
@@ -1134,16 +1144,16 @@ int sqlite3BtreeOpen(
#endif
#endif
assert( pSqlite!=0 );
assert( sqlite3_mutex_held(pSqlite->mutex) );
assert( db!=0 );
assert( sqlite3_mutex_held(db->mutex) );
pVfs = pSqlite->pVfs;
pVfs = db->pVfs;
p = sqlite3MallocZero(sizeof(Btree));
if( !p ){
return SQLITE_NOMEM;
}
p->inTrans = TRANS_NONE;
p->pSqlite = pSqlite;
p->db = db;
#if !defined(SQLITE_OMIT_SHARED_CACHE) && !defined(SQLITE_OMIT_DISKIO)
/*
@@ -1152,7 +1162,7 @@ int sqlite3BtreeOpen(
*/
if( (flags & BTREE_PRIVATE)==0
&& isMemdb==0
&& (pSqlite->flags & SQLITE_Vtab)==0
&& (db->flags & SQLITE_Vtab)==0
&& zFilename && zFilename[0]
){
if( sqlite3SharedCacheEnabled ){
@@ -1160,8 +1170,8 @@ int sqlite3BtreeOpen(
char *zFullPathname = (char *)sqlite3_malloc(nFullPathname);
sqlite3_mutex *mutexShared;
p->sharable = 1;
if( pSqlite ){
pSqlite->flags |= SQLITE_SharedCache;
if( db ){
db->flags |= SQLITE_SharedCache;
}
if( !zFullPathname ){
sqlite3_free(p);
@@ -1211,6 +1221,8 @@ int sqlite3BtreeOpen(
rc = SQLITE_NOMEM;
goto btree_open_out;
}
pBt->busyHdr.xFunc = sqlite3BtreeInvokeBusyHandler;
pBt->busyHdr.pArg = pBt;
rc = sqlite3PagerOpen(pVfs, &pBt->pPager, zFilename,
EXTRA_SIZE, flags, vfsFlags);
if( rc==SQLITE_OK ){
@@ -1219,6 +1231,7 @@ int sqlite3BtreeOpen(
if( rc!=SQLITE_OK ){
goto btree_open_out;
}
sqlite3PagerSetBusyhandler(pBt->pPager, &pBt->busyHdr);
p->pBt = pBt;
sqlite3PagerSetDestructor(pBt->pPager, pageDestructor);
@@ -1273,7 +1286,7 @@ int sqlite3BtreeOpen(
pBt->mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
if( pBt->mutex==0 ){
rc = SQLITE_NOMEM;
pSqlite->mallocFailed = 0;
db->mallocFailed = 0;
goto btree_open_out;
}
}
@@ -1293,8 +1306,8 @@ int sqlite3BtreeOpen(
if( p->sharable ){
int i;
Btree *pSib;
for(i=0; i<pSqlite->nDb; i++){
if( (pSib = pSqlite->aDb[i].pBt)!=0 && pSib->sharable ){
for(i=0; i<db->nDb; i++){
if( (pSib = db->aDb[i].pBt)!=0 && pSib->sharable ){
while( pSib->pPrev ){ pSib = pSib->pPrev; }
if( p->pBt<pSib->pBt ){
p->pNext = pSib;
@@ -1378,8 +1391,9 @@ int sqlite3BtreeClose(Btree *p){
BtCursor *pCur;
/* Close all cursors opened via this handle. */
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
assert( sqlite3_mutex_held(p->db->mutex) );
sqlite3BtreeEnter(p);
pBt->db = p->db;
pCur = pBt->pCursor;
while( pCur ){
BtCursor *pTmp = pCur;
@@ -1427,19 +1441,6 @@ int sqlite3BtreeClose(Btree *p){
return SQLITE_OK;
}
/*
** Change the busy handler callback function.
*/
int sqlite3BtreeSetBusyHandler(Btree *p, BusyHandler *pHandler){
BtShared *pBt = p->pBt;
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
sqlite3BtreeEnter(p);
pBt->pBusyHandler = pHandler;
sqlite3PagerSetBusyhandler(pBt->pPager, pHandler);
sqlite3BtreeLeave(p);
return SQLITE_OK;
}
/*
** Change the limit on the number of pages allowed in the cache.
**
@@ -1457,7 +1458,7 @@ int sqlite3BtreeSetBusyHandler(Btree *p, BusyHandler *pHandler){
*/
int sqlite3BtreeSetCacheSize(Btree *p, int mxPage){
BtShared *pBt = p->pBt;
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
assert( sqlite3_mutex_held(p->db->mutex) );
sqlite3BtreeEnter(p);
sqlite3PagerSetCachesize(pBt->pPager, mxPage);
sqlite3BtreeLeave(p);
@@ -1475,7 +1476,7 @@ int sqlite3BtreeSetCacheSize(Btree *p, int mxPage){
#ifndef SQLITE_OMIT_PAGER_PRAGMAS
int sqlite3BtreeSetSafetyLevel(Btree *p, int level, int fullSync){
BtShared *pBt = p->pBt;
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
assert( sqlite3_mutex_held(p->db->mutex) );
sqlite3BtreeEnter(p);
sqlite3PagerSetSafetyLevel(pBt->pPager, level, fullSync);
sqlite3BtreeLeave(p);
@@ -1490,7 +1491,7 @@ int sqlite3BtreeSetSafetyLevel(Btree *p, int level, int fullSync){
int sqlite3BtreeSyncDisabled(Btree *p){
BtShared *pBt = p->pBt;
int rc;
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
assert( sqlite3_mutex_held(p->db->mutex) );
sqlite3BtreeEnter(p);
assert( pBt && pBt->pPager );
rc = sqlite3PagerNosync(pBt->pPager);
@@ -1823,6 +1824,7 @@ int sqlite3BtreeBeginTrans(Btree *p, int wrflag){
int rc = SQLITE_OK;
sqlite3BtreeEnter(p);
pBt->db = p->db;
btreeIntegrity(p);
/* If the btree is already in a write-transaction, or it
@@ -1870,7 +1872,7 @@ int sqlite3BtreeBeginTrans(Btree *p, int wrflag){
unlockBtreeIfUnused(pBt);
}
}while( rc==SQLITE_BUSY && pBt->inTransaction==TRANS_NONE &&
sqlite3InvokeBusyHandler(pBt->pBusyHandler) );
sqlite3BtreeInvokeBusyHandler(pBt, 0) );
if( rc==SQLITE_OK ){
if( p->inTrans==TRANS_NONE ){
@@ -2196,6 +2198,7 @@ int sqlite3BtreeIncrVacuum(Btree *p){
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
assert( pBt->inTransaction==TRANS_WRITE && p->inTrans==TRANS_WRITE );
if( !pBt->autoVacuum ){
rc = SQLITE_DONE;
@@ -2312,6 +2315,7 @@ int sqlite3BtreeCommitPhaseOne(Btree *p, const char *zMaster){
BtShared *pBt = p->pBt;
Pgno nTrunc = 0;
sqlite3BtreeEnter(p);
pBt->db = p->db;
#ifndef SQLITE_OMIT_AUTOVACUUM
if( pBt->autoVacuum ){
rc = autoVacuumCommit(pBt, &nTrunc);
@@ -2345,6 +2349,7 @@ int sqlite3BtreeCommitPhaseTwo(Btree *p){
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
btreeIntegrity(p);
/* If the handle has a write-transaction open, commit the shared-btrees
@@ -2465,6 +2470,7 @@ int sqlite3BtreeRollback(Btree *p){
MemPage *pPage1;
sqlite3BtreeEnter(p);
pBt->db = p->db;
rc = saveAllCursors(pBt, 0, 0);
#ifndef SQLITE_OMIT_SHARED_CACHE
if( rc!=SQLITE_OK ){
@@ -2540,6 +2546,7 @@ int sqlite3BtreeBeginStmt(Btree *p){
int rc;
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
if( (p->inTrans!=TRANS_WRITE) || pBt->inStmt ){
rc = pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}else{
@@ -2560,6 +2567,7 @@ int sqlite3BtreeCommitStmt(Btree *p){
int rc;
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
if( pBt->inStmt && !pBt->readOnly ){
rc = sqlite3PagerStmtCommit(pBt->pPager);
}else{
@@ -2582,6 +2590,7 @@ int sqlite3BtreeRollbackStmt(Btree *p){
int rc = SQLITE_OK;
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
if( pBt->inStmt && !pBt->readOnly ){
rc = sqlite3PagerStmtRollback(pBt->pPager);
assert( countWriteCursors(pBt)==0 );
@@ -2725,6 +2734,7 @@ int sqlite3BtreeCursor(
){
int rc;
sqlite3BtreeEnter(p);
p->pBt->db = p->db;
rc = btreeCursor(p, iTable, wrFlag, xCmp, pArg, ppCur);
sqlite3BtreeLeave(p);
return rc;
@@ -2740,6 +2750,7 @@ int sqlite3BtreeCloseCursor(BtCursor *pCur){
Btree *pBtree = pCur->pBtree;
sqlite3BtreeEnter(pBtree);
pBt->db = pBtree->db;
clearCursorPosition(pCur);
if( pCur->pPrev ){
pCur->pPrev->pNext = pCur->pNext;
@@ -3474,7 +3485,7 @@ int sqlite3BtreeFirst(BtCursor *pCur, int *pRes){
int rc;
assert( cursorHoldsMutex(pCur) );
assert( sqlite3_mutex_held(pCur->pBtree->pSqlite->mutex) );
assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
rc = moveToRoot(pCur);
if( rc==SQLITE_OK ){
if( pCur->eState==CURSOR_INVALID ){
@@ -3498,7 +3509,7 @@ int sqlite3BtreeLast(BtCursor *pCur, int *pRes){
int rc;
assert( cursorHoldsMutex(pCur) );
assert( sqlite3_mutex_held(pCur->pBtree->pSqlite->mutex) );
assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
rc = moveToRoot(pCur);
if( rc==SQLITE_OK ){
if( CURSOR_INVALID==pCur->eState ){
@@ -3551,7 +3562,7 @@ int sqlite3BtreeMoveto(
int rc;
assert( cursorHoldsMutex(pCur) );
assert( sqlite3_mutex_held(pCur->pBtree->pSqlite->mutex) );
assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
rc = moveToRoot(pCur);
if( rc ){
return rc;
@@ -3678,8 +3689,8 @@ int sqlite3BtreeEof(BtCursor *pCur){
** Return the database connection handle for a cursor.
*/
sqlite3 *sqlite3BtreeCursorDb(const BtCursor *pCur){
assert( sqlite3_mutex_held(pCur->pBtree->pSqlite->mutex) );
return pCur->pBtree->pSqlite;
assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
return pCur->pBtree->db;
}
/*
@@ -5500,14 +5511,14 @@ static int balance(MemPage *pPage, int insert){
static int checkReadLocks(Btree *pBtree, Pgno pgnoRoot, BtCursor *pExclude){
BtCursor *p;
BtShared *pBt = pBtree->pBt;
sqlite3 *db = pBtree->pSqlite;
sqlite3 *db = pBtree->db;
assert( sqlite3BtreeHoldsMutex(pBtree) );
for(p=pBt->pCursor; p; p=p->pNext){
if( p==pExclude ) continue;
if( p->eState!=CURSOR_VALID ) continue;
if( p->pgnoRoot!=pgnoRoot ) continue;
if( p->wrFlag==0 ){
sqlite3 *dbOther = p->pBtree->pSqlite;
sqlite3 *dbOther = p->pBtree->db;
if( dbOther==0 ||
(dbOther!=db && (dbOther->flags & SQLITE_ReadUncommitted)==0) ){
return SQLITE_LOCKED;
@@ -5880,6 +5891,7 @@ static int btreeCreateTable(Btree *p, int *piTable, int flags){
int sqlite3BtreeCreateTable(Btree *p, int *piTable, int flags){
int rc;
sqlite3BtreeEnter(p);
p->pBt->db = p->db;
rc = btreeCreateTable(p, piTable, flags);
sqlite3BtreeLeave(p);
return rc;
@@ -5944,6 +5956,7 @@ int sqlite3BtreeClearTable(Btree *p, int iTable){
int rc;
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
if( p->inTrans!=TRANS_WRITE ){
rc = pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}else if( (rc = checkReadLocks(p, iTable, 0))!=SQLITE_OK ){
@@ -6087,6 +6100,7 @@ static int btreeDropTable(Btree *p, int iTable, int *piMoved){
int sqlite3BtreeDropTable(Btree *p, int iTable, int *piMoved){
int rc;
sqlite3BtreeEnter(p);
p->pBt->db = p->db;
rc = btreeDropTable(p, iTable, piMoved);
sqlite3BtreeLeave(p);
return rc;
@@ -6110,6 +6124,7 @@ int sqlite3BtreeGetMeta(Btree *p, int idx, u32 *pMeta){
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
/* Reading a meta-data value requires a read-lock on page 1 (and hence
** the sqlite_master table. We grab this lock regardless of whether or
@@ -6155,6 +6170,7 @@ int sqlite3BtreeUpdateMeta(Btree *p, int idx, u32 iMeta){
int rc;
assert( idx>=1 && idx<=15 );
sqlite3BtreeEnter(p);
pBt->db = p->db;
if( p->inTrans!=TRANS_WRITE ){
rc = pBt->readOnly ? SQLITE_READONLY : SQLITE_ERROR;
}else{
@@ -6543,6 +6559,7 @@ char *sqlite3BtreeIntegrityCheck(
BtShared *pBt = p->pBt;
sqlite3BtreeEnter(p);
pBt->db = p->db;
nRef = sqlite3PagerRefcount(pBt->pPager);
if( lockBtreeWithRetry(p)!=SQLITE_OK ){
sqlite3BtreeLeave(p);
@@ -6569,7 +6586,7 @@ char *sqlite3BtreeIntegrityCheck(
unlockBtreeIfUnused(pBt);
*pnErr = 1;
sqlite3BtreeLeave(p);
return sqlite3MPrintf(p->pSqlite, "Unable to malloc %d bytes",
return sqlite3MPrintf(p->db, "Unable to malloc %d bytes",
(sCheck.nPage+1)*sizeof(sCheck.anRef[0]));
}
for(i=0; i<=sCheck.nPage; i++){ sCheck.anRef[i] = 0; }
@@ -6686,6 +6703,9 @@ static int btreeCopyFile(Btree *pTo, Btree *pFrom){
BtShared *pBtTo = pTo->pBt;
BtShared *pBtFrom = pFrom->pBt;
pBtTo->db = pTo->db;
pBtFrom->db = pFrom->db;
if( pTo->inTrans!=TRANS_WRITE || pFrom->inTrans!=TRANS_WRITE ){
return SQLITE_ERROR;
@@ -6748,7 +6768,7 @@ int sqlite3BtreeCopyFile(Btree *pTo, Btree *pFrom){
** Return non-zero if a transaction is active.
*/
int sqlite3BtreeIsInTrans(Btree *p){
assert( p==0 || sqlite3_mutex_held(p->pSqlite->mutex) );
assert( p==0 || sqlite3_mutex_held(p->db->mutex) );
return (p && (p->inTrans==TRANS_WRITE));
}
@@ -6764,7 +6784,7 @@ int sqlite3BtreeIsInStmt(Btree *p){
** Return non-zero if a read (or write) transaction is active.
*/
int sqlite3BtreeIsInReadTrans(Btree *p){
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
assert( sqlite3_mutex_held(p->db->mutex) );
return (p && (p->inTrans!=TRANS_NONE));
}
@@ -6801,7 +6821,7 @@ void *sqlite3BtreeSchema(Btree *p, int nBytes, void(*xFree)(void *)){
*/
int sqlite3BtreeSchemaLocked(Btree *p){
int rc;
assert( sqlite3_mutex_held(p->pSqlite->mutex) );
assert( sqlite3_mutex_held(p->db->mutex) );
sqlite3BtreeEnter(p);
rc = (queryTableLock(p, MASTER_ROOT, READ_LOCK)!=SQLITE_OK);
sqlite3BtreeLeave(p);
@@ -6838,7 +6858,7 @@ int sqlite3BtreeLockTable(Btree *p, int iTab, u8 isWriteLock){
*/
int sqlite3BtreePutData(BtCursor *pCsr, u32 offset, u32 amt, void *z){
assert( cursorHoldsMutex(pCsr) );
assert( sqlite3_mutex_held(pCsr->pBtree->pSqlite->mutex) );
assert( sqlite3_mutex_held(pCsr->pBtree->db->mutex) );
assert(pCsr->isIncrblobHandle);
if( pCsr->eState>=CURSOR_REQUIRESEEK ){
if( pCsr->eState==CURSOR_FAULT ){
@@ -6880,7 +6900,7 @@ int sqlite3BtreePutData(BtCursor *pCsr, u32 offset, u32 amt, void *z){
*/
void sqlite3BtreeCacheOverflow(BtCursor *pCur){
assert( cursorHoldsMutex(pCur) );
assert( sqlite3_mutex_held(pCur->pBtree->pSqlite->mutex) );
assert( sqlite3_mutex_held(pCur->pBtree->db->mutex) );
assert(!pCur->isIncrblobHandle);
assert(!pCur->aOverflow);
pCur->isIncrblobHandle = 1;