1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-07 02:42:48 +03:00

Implement a B+tree option (all data stored on leaves). (CVS 1365)

FossilOrigin-Name: b8f70d17f06531269caa0a127efb2d25ad0f3e1c
This commit is contained in:
drh
2004-05-12 19:18:15 +00:00
parent 4a1c380a4b
commit 8b18dd4fb5
6 changed files with 237 additions and 62 deletions

View File

@@ -9,7 +9,7 @@
** May you share freely, never taking more than you give.
**
*************************************************************************
** $Id: btree.c,v 1.128 2004/05/12 15:15:47 drh Exp $
** $Id: btree.c,v 1.129 2004/05/12 19:18:16 drh Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
@@ -196,8 +196,8 @@ static const char zMagicHeader[] = "SQLite format 3";
*/
#define PTF_INTKEY 0x01
#define PTF_ZERODATA 0x02
#define PTF_LEAF 0x04
/* Idea for the future: PTF_LEAFDATA */
#define PTF_LEAFDATA 0x04
#define PTF_LEAF 0x08
/*
** As each page of the file is loaded into memory, an instance of the following
@@ -216,7 +216,9 @@ struct MemPage {
u8 isOverfull; /* Some aCell[] do not fit on page */
u8 intKey; /* True if intkey flag is set */
u8 leaf; /* True if leaf flag is set */
u8 zeroData; /* True if zero data flag is set */
u8 zeroData; /* True if table stores keys only */
u8 leafData; /* True if tables stores data on leaves only */
u8 hasData; /* True if this page stores data */
u8 hdrOffset; /* 100 for page 1. 0 otherwise */
u8 needRelink; /* True if need to run relinkCellList() */
int idxParent; /* Index in pParent->aCell[] of this node */
@@ -345,10 +347,10 @@ static void parseCellHeader(
}else{
n = 6;
}
if( pPage->zeroData ){
*pnData = 0;
}else{
if( pPage->hasData ){
n += getVarint(&pCell[n], pnData);
}else{
*pnData = 0;
}
n += getVarint(&pCell[n], (u64*)pnKey);
*pnHeader = n;
@@ -402,7 +404,10 @@ static void _pageIntegrity(MemPage *pPage){
if( pPage->isInit ){
assert( pPage->leaf == ((c & PTF_LEAF)!=0) );
assert( pPage->zeroData == ((c & PTF_ZERODATA)!=0) );
assert( pPage->intKey == ((c & PTF_INTKEY)!=0) );
assert( pPage->leafData == ((c & PTF_LEAFDATA)!=0) );
assert( pPage->intKey == ((c & (PTF_INTKEY|PTF_LEAFDATA))!=0) );
assert( pPage->hasData ==
!(pPage->zeroData || (!pPage->leaf && pPage->leafData)) );
}
data = pPage->aData;
memset(used, 0, pageSize);
@@ -689,9 +694,11 @@ static int initPage(
hdr = pPage->hdrOffset;
data = pPage->aData;
c = data[hdr];
pPage->intKey = (c & PTF_INTKEY)!=0;
pPage->intKey = (c & (PTF_INTKEY|PTF_LEAFDATA))!=0;
pPage->zeroData = (c & PTF_ZERODATA)!=0;
pPage->leafData = (c & PTF_LEAFDATA)!=0;
pPage->leaf = (c & PTF_LEAF)!=0;
pPage->hasData = !(pPage->zeroData || (!pPage->leaf && pPage->leafData));
pPage->isOverfull = 0;
pPage->needRelink = 0;
pPage->idxShift = 0;
@@ -763,9 +770,11 @@ static void zeroPage(MemPage *pPage, int flags){
pPage->nCell = 0;
pPage->nCellAlloc = 0;
pPage->nFree = pBt->pageSize - first;
pPage->intKey = (flags & PTF_INTKEY)!=0;
pPage->leaf = (flags & PTF_LEAF)!=0;
pPage->intKey = (flags & (PTF_INTKEY|PTF_LEAFDATA))!=0;
pPage->zeroData = (flags & PTF_ZERODATA)!=0;
pPage->leafData = (flags & PTF_LEAFDATA)!=0;
pPage->leaf = (flags & PTF_LEAF)!=0;
pPage->hasData = !(pPage->zeroData || (!pPage->leaf && pPage->leafData));
pPage->hdrOffset = hdr;
pPage->isOverfull = 0;
pPage->needRelink = 0;
@@ -1033,7 +1042,7 @@ static int newDatabase(Btree *pBt){
data[18] = 1;
data[19] = 1;
put2byte(&data[22], (SQLITE_PAGE_SIZE-10)/4-12);
zeroPage(pP1, PTF_INTKEY|PTF_LEAF);
zeroPage(pP1, PTF_INTKEY|PTF_LEAF );
return SQLITE_OK;
}
@@ -1410,7 +1419,7 @@ int sqlite3BtreeKeySize(BtCursor *pCur, i64 *pSize){
if( !pPage->leaf ){
cell += 4; /* Skip the child pointer */
}
if( !pPage->zeroData ){
if( pPage->hasData ){
while( (0x80&*(cell++))!=0 ){} /* Skip the data size number */
}
getVarint(cell, pSize);
@@ -1437,7 +1446,7 @@ int sqlite3BtreeDataSize(BtCursor *pCur, u32 *pSize){
assert( pPage!=0 );
assert( pPage->isInit );
pageIntegrity(pPage);
if( pPage->zeroData ){
if( !pPage->hasData ){
*pSize = 0;
}else{
assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
@@ -1488,10 +1497,10 @@ static int getPayload(
if( !pPage->leaf ){
aPayload += 4; /* Skip the child pointer */
}
if( pPage->zeroData ){
nData = 0;
}else{
if( pPage->hasData ){
aPayload += getVarint(aPayload, &nData);
}else{
nData = 0;
}
aPayload += getVarint(aPayload, (u64*)&nKey);
if( pPage->intKey ){
@@ -1635,10 +1644,10 @@ static const unsigned char *fetchPayload(
if( !pPage->leaf ){
aPayload += 4; /* Skip the child pointer */
}
if( pPage->zeroData ){
nData = 0;
}else{
if( pPage->hasData ){
aPayload += getVarint(aPayload, &nData);
}else{
nData = 0;
}
aPayload += getVarint(aPayload, (u64*)&nKey);
if( pPage->intKey ){
@@ -1977,9 +1986,13 @@ int sqlite3BtreeMoveto(BtCursor *pCur, const void *pKey, i64 nKey, int *pRes){
if( rc ) return rc;
}
if( c==0 ){
pCur->iMatch = c;
if( pRes ) *pRes = 0;
return SQLITE_OK;
if( pPage->leafData && !pPage->leaf ){
break;
}else{
pCur->iMatch = c;
if( pRes ) *pRes = 0;
return SQLITE_OK;
}
}
if( c<0 ){
lwr = pCur->idx+1;
@@ -2031,6 +2044,7 @@ int sqlite3BtreeEof(BtCursor *pCur){
int sqlite3BtreeNext(BtCursor *pCur, int *pRes){
int rc;
MemPage *pPage = pCur->pPage;
assert( pRes!=0 );
if( pCur->isValid==0 ){
*pRes = 1;
@@ -2057,7 +2071,12 @@ int sqlite3BtreeNext(BtCursor *pCur, int *pRes){
pPage = pCur->pPage;
}while( pCur->idx>=pPage->nCell );
*pRes = 0;
return SQLITE_OK;
if( pPage->leafData ){
rc = sqlite3BtreeNext(pCur, pRes);
}else{
rc = SQLITE_OK;
}
return rc;
}
*pRes = 0;
if( pPage->leaf ){
@@ -2100,7 +2119,11 @@ int sqlite3BtreePrevious(BtCursor *pCur, int *pRes){
pPage = pCur->pPage;
}
pCur->idx--;
rc = SQLITE_OK;
if( pPage->leafData ){
rc = sqlite3BtreePrevious(pCur, pRes);
}else{
rc = SQLITE_OK;
}
}
*pRes = 0;
return rc;
@@ -2335,13 +2358,13 @@ static int fillInCell(
if( !pPage->leaf ){
nHeader += 4;
}
if( !pPage->zeroData ){
if( pPage->hasData ){
nHeader += putVarint(&pCell[nHeader], nData);
}
nHeader += putVarint(&pCell[nHeader], *(u64*)&nKey);
/* Fill in the payload */
if( pPage->zeroData ){
if( !pPage->hasData ){
nData = 0;
}
nPayload = nData;
@@ -2687,6 +2710,7 @@ static int balance(MemPage *pPage){
int nxDiv; /* Next divider slot in pParent->aCell[] */
int rc; /* The return code */
int leafCorrection; /* 4 if pPage is a leaf. 0 if not */
int leafData; /* True if pPage is a leaf of a LEAFDATA tree */
int usableSpace; /* Bytes in pPage beyond the header */
int pageFlags; /* Value of pPage->aData[0] */
int subtotal; /* Subtotal of bytes in cells on one page */
@@ -2713,7 +2737,7 @@ static int balance(MemPage *pPage){
assert( pPage->isInit );
assert( sqlite3pager_iswriteable(pPage->aData) );
pBt = pPage->pBt;
if( !pPage->isOverfull && pPage->nFree<pBt->pageSize/2 && pPage->nCell>=2){
if( !pPage->isOverfull && pPage->nFree<pBt->pageSize*2/3 && pPage->nCell>=2){
relinkCellList(pPage);
return SQLITE_OK;
}
@@ -2912,6 +2936,7 @@ static int balance(MemPage *pPage){
*/
nCell = 0;
leafCorrection = pPage->leaf*4;
leafData = pPage->leafData && pPage->leaf;
for(i=0; i<nOld; i++){
MemPage *pOld = apCopy[i];
for(j=0; j<pOld->nCell; j++){
@@ -2920,21 +2945,26 @@ static int balance(MemPage *pPage){
nCell++;
}
if( i<nOld-1 ){
szCell[nCell] = cellSize(pParent, apDiv[i]);
memcpy(aTemp[i], apDiv[i], szCell[nCell]);
apCell[nCell] = &aTemp[i][leafCorrection];
dropCell(pParent, nxDiv, szCell[nCell]);
szCell[nCell] -= leafCorrection;
assert( get4byte(&aTemp[i][2])==pgnoOld[i] );
if( !pOld->leaf ){
assert( leafCorrection==0 );
/* The right pointer of the child page pOld becomes the left
** pointer of the divider cell */
memcpy(&apCell[nCell][2], &pOld->aData[pOld->hdrOffset+6], 4);
if( leafData ){
int sz = cellSize(pParent, apDiv[i]);
dropCell(pParent, nxDiv, sz);
}else{
assert( leafCorrection==4 );
szCell[nCell] = cellSize(pParent, apDiv[i]);
memcpy(aTemp[i], apDiv[i], szCell[nCell]);
apCell[nCell] = &aTemp[i][leafCorrection];
dropCell(pParent, nxDiv, szCell[nCell]);
szCell[nCell] -= leafCorrection;
assert( get4byte(&aTemp[i][2])==pgnoOld[i] );
if( !pOld->leaf ){
assert( leafCorrection==0 );
/* The right pointer of the child page pOld becomes the left
** pointer of the divider cell */
memcpy(&apCell[nCell][2], &pOld->aData[pOld->hdrOffset+6], 4);
}else{
assert( leafCorrection==4 );
}
nCell++;
}
nCell++;
}
}
@@ -2954,6 +2984,7 @@ static int balance(MemPage *pPage){
if( subtotal > usableSpace ){
szNew[k] = subtotal - szCell[i];
cntNew[k] = i;
if( leafData ){ i--; }
subtotal = 0;
k++;
}
@@ -3064,16 +3095,28 @@ static int balance(MemPage *pPage){
assert( !pNew->isOverfull );
relinkCellList(pNew);
if( i<nNew-1 && j<nCell ){
u8 *pCell = apCell[j];
u8 *pCell;
u8 *pTemp;
int sz;
pCell = apCell[j];
sz = szCell[j] + leafCorrection;
if( !pNew->leaf ){
memcpy(&pNew->aData[6], pCell+2, 4);
pTemp = 0;
}else if( leafData ){
i64 nKey;
u64 nData;
int nHeader;
j--;
parseCellHeader(pNew, apCell[j], &nData, &nKey, &nHeader);
pCell = aInsBuf[i];
fillInCell(pParent, pCell, 0, nKey, 0, 0, &sz);
pTemp = 0;
}else{
pCell -= 4;
pTemp = aInsBuf[i];
}
insertCell(pParent, nxDiv, pCell, szCell[j]+leafCorrection, pTemp);
insertCell(pParent, nxDiv, pCell, sz, pTemp);
put4byte(&pParent->aCell[nxDiv][2], pNew->pgno);
j++;
nxDiv++;
@@ -3200,6 +3243,7 @@ int sqlite3BtreeInsert(
if( rc ) return rc;
pPage = pCur->pPage;
assert( pPage->intKey || nKey>=0 );
assert( pPage->leaf || !pPage->leafData );
TRACE(("INSERT: table=%d nkey=%lld ndata=%d page=%d %s\n",
pCur->pgnoRoot, nKey, nData, pPage->pgno,
loc==0 ? "overwrite" : "new entry"));
@@ -3284,6 +3328,7 @@ int sqlite3BtreeDelete(BtCursor *pCur){
int szNext;
int notUsed;
unsigned char tempCell[MX_CELL_SIZE];
assert( !pPage->leafData );
getTempCursor(pCur, &leafCur);
rc = sqlite3BtreeNext(&leafCur, &notUsed);
if( rc!=SQLITE_OK ){
@@ -3518,9 +3563,11 @@ int sqlite3BtreePageDump(Btree *pBt, int pgno, int recursive){
hdr = pPage->hdrOffset;
data = pPage->aData;
c = data[hdr];
pPage->intKey = (c & PTF_INTKEY)!=0;
pPage->intKey = (c & (PTF_INTKEY|PTF_LEAFDATA))!=0;
pPage->zeroData = (c & PTF_ZERODATA)!=0;
pPage->leafData = (c & PTF_LEAFDATA)!=0;
pPage->leaf = (c & PTF_LEAF)!=0;
pPage->hasData = !(pPage->zeroData || (!pPage->leaf && pPage->leafData));
printf("PAGE %d: flags=0x%02x frag=%d parent=%d\n", pgno,
data[hdr], data[hdr+5],
(pPage->isInit && pPage->pParent) ? pPage->pParent->pgno : 0);