1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-07 02:42:48 +03:00

Enhance the performance of auto-vacuum databases by reducing the number of pointer-map entries written during tree balancing. Also fix bugs in balance_quick(). (CVS 2216)

FossilOrigin-Name: 0ae29538ccccfc237904cbcfb4507074db0f5905
This commit is contained in:
danielk1977
2005-01-15 12:45:51 +00:00
parent 018d1a4929
commit ac11ee6766
6 changed files with 182 additions and 52 deletions

View File

@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.233 2005/01/14 22:55:49 drh Exp $
** $Id: btree.c,v 1.234 2005/01/15 12:45:51 danielk1977 Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
@@ -463,6 +463,7 @@ static int ptrmapPut(Btree *pBt, Pgno key, u8 eType, Pgno pgno){
int offset; /* Offset in pointer map page */
int rc;
assert( pBt->autoVacuum );
assert( key!=0 );
iPtrmap = PTRMAP_PAGENO(pBt->usableSize, key);
rc = sqlite3pager_get(pBt->pPager, iPtrmap, (void **)&pPtrmap);
@@ -642,6 +643,19 @@ static int cellSizePtr(MemPage *pPage, u8 *pCell){
return info.nSize;
}
/*
** If pCell, part of pPage, contains a pointer to an overflow page,
** return the overflow page number. Otherwise return 0.
*/
static Pgno ovflPagePtr(MemPage *pPage, u8 *pCell){
CellInfo info;
parseCellPtr(pPage, pCell, &info);
if( (info.nData+(pPage->intKey?0:info.nKey))>info.nLocal ){
return get4byte(&pCell[info.iOverflow]);
}
return 0;
}
/*
** Do sanity checking on a page. Throw an exception if anything is
** not right.
@@ -1812,7 +1826,10 @@ static int autoVacuumCommit(Btree *pBt, Pgno *nTrunc){
pFreeMemPage = 0;
}
rc = allocatePage(pBt, &pFreeMemPage, &iFreePage, 0, 0);
if( rc!=SQLITE_OK ) goto autovacuum_out;
if( rc!=SQLITE_OK ){
releasePage(pDbMemPage);
goto autovacuum_out;
}
assert( iFreePage<=origSize );
}while( iFreePage>finSize );
releasePage(pFreeMemPage);
@@ -3320,6 +3337,8 @@ static int reparentPage(Btree *pBt, Pgno pgno, MemPage *pNewParent, int idx){
return SQLITE_OK;
}
/*
** Change the pParent pointer of all children of pPage to point back
** to pPage.
@@ -3335,11 +3354,7 @@ static int reparentChildPages(MemPage *pPage){
Btree *pBt = pPage->pBt;
int rc = SQLITE_OK;
#ifdef SQLITE_OMIT_AUTOVACUUM
if( pPage->leaf ) return SQLITE_OK;
#else
if( !pBt->autoVacuum && pPage->leaf ) return SQLITE_OK;
#endif
for(i=0; i<pPage->nCell; i++){
u8 *pCell = findCell(pPage, i);
@@ -3347,23 +3362,6 @@ static int reparentChildPages(MemPage *pPage){
rc = reparentPage(pBt, get4byte(pCell), pPage, i);
if( rc!=SQLITE_OK ) return rc;
}
#ifndef SQLITE_OMIT_AUTOVACUUM
/* If the database supports auto-vacuum, then check each cell to see
** if it contains a pointer to an overflow page. If so, then the
** pointer-map must be updated accordingly.
**
** TODO: This looks like quite an expensive thing to do. Investigate.
*/
if( pBt->autoVacuum ){
CellInfo info;
parseCellPtr(pPage, pCell, &info);
if( (info.nData+(pPage->intKey?0:info.nKey))>info.nLocal ){
Pgno pgnoOvfl = get4byte(&pCell[info.iOverflow]);
rc = ptrmapPut(pBt, pgnoOvfl, PTRMAP_OVERFLOW1, pPage->pgno);
if( rc!=SQLITE_OK ) return rc;
}
}
#endif
}
if( !pPage->leaf ){
rc = reparentPage(pBt, get4byte(&pPage->aData[pPage->hdrOffset+8]),
@@ -3582,6 +3580,7 @@ static int balance_quick(MemPage *pPage, MemPage *pParent){
u8 *pCell;
int szCell;
CellInfo info;
Btree *pBt = pPage->pBt;
u8 parentCell[64]; /* How big should this be? */
int parentIdx = pParent->nCell;
@@ -3590,7 +3589,7 @@ static int balance_quick(MemPage *pPage, MemPage *pParent){
/* Allocate a new page. Insert the overflow cell from pPage
** into it. Then remove the overflow cell from pPage.
*/
rc = allocatePage(pPage->pBt, &pNew, &pgnoNew, 0, 0);
rc = allocatePage(pBt, &pNew, &pgnoNew, 0, 0);
if( rc!=SQLITE_OK ){
return rc;
}
@@ -3605,6 +3604,7 @@ static int balance_quick(MemPage *pPage, MemPage *pParent){
** pPage is the next-to-right child. Then balance() the parent
** page, in case it is now overfull.
*/
assert( pPage->nCell>0 );
parseCellPtr(pPage, findCell(pPage, pPage->nCell-1), &info);
rc = fillInCell(pParent, parentCell, 0, info.nKey, 0, 0, &parentSize);
if( rc!=SQLITE_OK ){
@@ -3621,10 +3621,39 @@ static int balance_quick(MemPage *pPage, MemPage *pParent){
pNew->pParent = pParent;
sqlite3pager_ref(pParent->aData);
if( pBt->autoVacuum ){
rc = ptrmapPut(pBt, pgnoNew, PTRMAP_BTREE, pParent->pgno);
if( rc!=SQLITE_OK ){
return rc;
}
pCell = findCell(pNew, 0);
parseCellPtr(pNew, pCell, &info);
if( info.nData>info.nLocal ){
Pgno pgnoOvfl = get4byte(&pCell[info.iOverflow]);
rc = ptrmapPut(pBt, pgnoOvfl, PTRMAP_OVERFLOW1, pgnoNew);
if( rc!=SQLITE_OK ){
return rc;
}
}
}
releasePage(pNew);
return balance(pParent, 0);
}
/*
** The ISAUTOVACUUM macro is used within balance_nonroot() to determine
** if the database supports auto-vacuum or not. Because it is used
** within an expression that is an argument to another macro
** (sqliteMallocRaw), it is not possible to use conditional compilation.
** So, this macro is defined instead.
*/
#ifndef SQLITE_OMIT_AUTOVACUUM
#define ISAUTOVACUUM (pBt->autoVacuum)
#else
#define ISAUTOVACUUM 0
#endif
/*
** This routine redistributes Cells on pPage and up to NN*2 siblings
** of pPage so that all pages have about the same amount of free space.
@@ -3685,6 +3714,9 @@ static int balance_nonroot(MemPage *pPage){
int *szCell; /* Local size of all cells in apCell[] */
u8 *aCopy[NB]; /* Space for holding data of apCopy[] */
u8 *aSpace; /* Space to hold copies of dividers cells */
#ifndef SQLITE_OMIT_VACUUM
u8 *aFrom = 0;
#endif
/*
** Find the parent page.
@@ -3711,8 +3743,13 @@ static int balance_nonroot(MemPage *pPage){
pPage->leafData &&
pPage->nOverflow==1 &&
pPage->aOvfl[0].idx==pPage->nCell &&
pPage->pParent->pgno!=1 &&
get4byte(&pParent->aData[pParent->hdrOffset+8])==pPage->pgno
){
/*
** TODO: Check the siblings to the left of pPage. It may be that
** they are not full and no new page is required.
*/
return balance_quick(pPage, pParent);
}
#endif
@@ -3725,6 +3762,7 @@ static int balance_nonroot(MemPage *pPage){
(mxCellPerPage+2)*NB*(sizeof(u8*)+sizeof(int))
+ sizeof(MemPage)*NB
+ pBt->psAligned*(5+NB)
+ (ISAUTOVACUUM ? (mxCellPerPage+2)*NN*2 : 0)
);
if( apCell==0 ){
return SQLITE_NOMEM;
@@ -3735,6 +3773,11 @@ static int balance_nonroot(MemPage *pPage){
aCopy[i] = &aCopy[i-1][pBt->psAligned+sizeof(MemPage)];
}
aSpace = &aCopy[NB-1][pBt->psAligned+sizeof(MemPage)];
#ifndef SQLITE_OMIT_AUTOVACUUM
if( pBt->autoVacuum ){
aFrom = &aSpace[5*pBt->psAligned];
}
#endif
/*
** Find the cell in the parent page whose left child points back
@@ -3836,6 +3879,18 @@ static int balance_nonroot(MemPage *pPage){
for(j=0; j<limit; j++){
apCell[nCell] = findOverflowCell(pOld, j);
szCell[nCell] = cellSizePtr(pOld, apCell[nCell]);
#ifndef SQLITE_OMIT_AUTOVACUUM
if( pBt->autoVacuum ){
int a;
aFrom[nCell] = i;
for(a=0; a<pOld->nOverflow; a++){
if( pOld->aOvfl[a].pCell==apCell[nCell] ){
aFrom[nCell] = 0xFF;
break;
}
}
}
#endif
nCell++;
}
if( i<nOld-1 ){
@@ -3855,6 +3910,11 @@ static int balance_nonroot(MemPage *pPage){
assert( iSpace<=pBt->psAligned*5 );
memcpy(pTemp, apDiv[i], sz);
apCell[nCell] = pTemp+leafCorrection;
#ifndef SQLITE_OMIT_AUTOVACUUM
if( pBt->autoVacuum ){
aFrom[nCell] = 0xFF;
}
#endif
dropCell(pParent, nxDiv, sz);
szCell[nCell] -= leafCorrection;
assert( get4byte(pTemp)==pgnoOld[i] );
@@ -4014,12 +4074,39 @@ static int balance_nonroot(MemPage *pPage){
*/
j = 0;
for(i=0; i<nNew; i++){
/* Assemble the new sibling page. */
MemPage *pNew = apNew[i];
assert( pNew->pgno==pgnoNew[i] );
assemblePage(pNew, cntNew[i]-j, &apCell[j], &szCell[j]);
j = cntNew[i];
assert( pNew->nCell>0 );
assert( pNew->nOverflow==0 );
#ifndef SQLITE_OMIT_AUTOVACUUM
/* If this is an auto-vacuum database, update the pointer map entries
** that point to the siblings that were rearranged. These can be: left
** children of cells, the right-child of the page, or overflow pages
** pointed to by cells.
*/
if( pBt->autoVacuum ){
for(k=j; k<cntNew[i]; k++){
if( aFrom[k]==0xFF || apCopy[aFrom[k]]->pgno!=pNew->pgno ){
Pgno ovfl = ovflPagePtr(pNew, findCell(pNew, k-j));
if( ovfl ){
rc = ptrmapPut(pBt, ovfl, PTRMAP_OVERFLOW1, pNew->pgno);
if( rc!=SQLITE_OK ){
goto balance_cleanup;
}
}
}
}
}
#endif
j = cntNew[i];
/* If the sibling page assembled above was not the right-most sibling,
** insert a divider cell into the parent page.
*/
if( i<nNew-1 && j<nCell ){
u8 *pCell;
u8 *pTemp;
@@ -4030,6 +4117,11 @@ static int balance_nonroot(MemPage *pPage){
memcpy(&pNew->aData[8], pCell, 4);
pTemp = 0;
}else if( leafData ){
/* If the tree is a leaf-data tree, and the siblings are leaves,
** then there is no divider cell in apCell[]. Instead, the divider
** cell consists of the integer key for the right-most cell of
** the sibling-page assembled above only.
*/
CellInfo info;
j--;
parseCellPtr(pNew, apCell[j], &info);
@@ -4047,6 +4139,21 @@ static int balance_nonroot(MemPage *pPage){
rc = insertCell(pParent, nxDiv, pCell, sz, pTemp, 4);
if( rc!=SQLITE_OK ) goto balance_cleanup;
put4byte(findOverflowCell(pParent,nxDiv), pNew->pgno);
#ifndef SQLITE_OMIT_AUTOVACUUM
/* If this is an auto-vacuum database, and not a leaf-data tree,
** then update the pointer map with an entry for the overflow page
** that the cell just inserted points to (if any).
*/
if( pBt->autoVacuum && !leafData ){
Pgno ovfl = ovflPagePtr(pParent, findOverflowCell(pParent, nxDiv));
if( ovfl ){
rc = ptrmapPut(pBt, ovfl, PTRMAP_OVERFLOW1, pParent->pgno);
if( rc!=SQLITE_OK ){
goto balance_cleanup;
}
}
}
#endif
j++;
nxDiv++;
}
@@ -4076,7 +4183,7 @@ static int balance_nonroot(MemPage *pPage){
/*
** Balance the parent page. Note that the current page (pPage) might
** have been added to the freelist is it might no longer be initialized.
** have been added to the freelist so it might no longer be initialized.
** But the parent page will always be initialized.
*/
assert( pParent->isInit );
@@ -4114,6 +4221,7 @@ static int balance_shallower(MemPage *pPage){
int mxCellPerPage; /* Maximum number of cells per page */
u8 **apCell; /* All cells from pages being balanced */
int *szCell; /* Local size of all cells */
int i;
assert( pPage->pParent==0 );
assert( pPage->nCell==0 );
@@ -4178,6 +4286,20 @@ static int balance_shallower(MemPage *pPage){
pChild->pgno, pPage->pgno));
}
rc = reparentChildPages(pPage);
assert( pPage->nOverflow==0 );
#ifndef SQLITE_OMIT_AUTOVACUUM
if( pBt->autoVacuum ){
for(i=0; i<pPage->nCell; i++){
Pgno ovfl = ovflPagePtr(pPage, findCell(pPage, i));
if( ovfl ){
rc = ptrmapPut(pBt, ovfl, PTRMAP_OVERFLOW1, pPage->pgno);
if( rc!=SQLITE_OK ){
goto end_shallow_balance;
}
}
}
}
#endif
if( rc!=SQLITE_OK ) goto end_shallow_balance;
releasePage(pChild);
}
@@ -4232,6 +4354,18 @@ static int balance_deeper(MemPage *pPage){
zeroPage(pPage, pChild->aData[0] & ~PTF_LEAF);
put4byte(&pPage->aData[pPage->hdrOffset+8], pgnoChild);
TRACE(("BALANCE: copy root %d into %d\n", pPage->pgno, pChild->pgno));
if( pBt->autoVacuum ){
int i;
rc = ptrmapPut(pBt, pChild->pgno, PTRMAP_BTREE, pPage->pgno);
if( rc ) return rc;
for(i=0; i<pChild->nCell; i++){
Pgno pgno = ovflPagePtr(pChild, findOverflowCell(pChild, i));
if( pgno ){
rc = ptrmapPut(pBt, pgno, PTRMAP_OVERFLOW1, pChild->pgno);
if( rc ) return rc;
}
}
}
rc = balance_nonroot(pChild);
releasePage(pChild);
return rc;