1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-07 02:42:48 +03:00

Add the MemPage.xParseCell method and provide various implementations

(variations on the former btreeParseCellPtr()) depending on the page type.

FossilOrigin-Name: 41d03d883c4f7ca279eb9dd679f3ab81c8d957d9
This commit is contained in:
drh
2015-06-19 17:19:34 +00:00
parent 25ada07ab4
commit 5fa605142f
4 changed files with 141 additions and 70 deletions

View File

@@ -980,11 +980,73 @@ static u8 *findOverflowCell(MemPage *pPage, int iCell){
}
/*
** Parse a cell content block and fill in the CellInfo structure. There
** are two versions of this function. btreeParseCell() takes a
** cell index as the second argument and btreeParseCellPtr()
** takes a pointer to the body of the cell as its second argument.
** This is common tail processing for btreeParseCellPtr() and
** btreeParseCellPtrIndex() for the case when the cell does not fit entirely
** on a single B-tree page. Make necessary adjustments to the CellInfo
** structure.
*/
static SQLITE_NOINLINE void btreeParseCellAdjustSizeForOverflow(
MemPage *pPage, /* Page containing the cell */
u8 *pCell, /* Pointer to the cell text. */
CellInfo *pInfo /* Fill in this structure */
){
/* If the payload will not fit completely on the local page, we have
** to decide how much to store locally and how much to spill onto
** overflow pages. The strategy is to minimize the amount of unused
** space on overflow pages while keeping the amount of local storage
** in between minLocal and maxLocal.
**
** Warning: changing the way overflow payload is distributed in any
** way will result in an incompatible file format.
*/
int minLocal; /* Minimum amount of payload held locally */
int maxLocal; /* Maximum amount of payload held locally */
int surplus; /* Overflow payload available for local storage */
minLocal = pPage->minLocal;
maxLocal = pPage->maxLocal;
surplus = minLocal + (pInfo->nPayload - minLocal)%(pPage->pBt->usableSize-4);
testcase( surplus==maxLocal );
testcase( surplus==maxLocal+1 );
if( surplus <= maxLocal ){
pInfo->nLocal = (u16)surplus;
}else{
pInfo->nLocal = (u16)minLocal;
}
pInfo->iOverflow = (u16)(&pInfo->pPayload[pInfo->nLocal] - pCell);
pInfo->nSize = pInfo->iOverflow + 4;
}
/*
** The following routines are implementations of the MemPage.xParseCell()
** method.
**
** Parse a cell content block and fill in the CellInfo structure.
**
** btreeParseCellPtr() => table btree leaf nodes
** btreeParseCellNoPayload() => table btree internal nodes
** btreeParseCellPtrIndex() => index btree nodes
**
** There is also a wrapper function btreeParseCell() that works for
** all MemPage types and that references the cell by index rather than
** by pointer.
*/
static void btreeParseCellPtrNoPayload(
MemPage *pPage, /* Page containing the cell */
u8 *pCell, /* Pointer to the cell text. */
CellInfo *pInfo /* Fill in this structure */
){
assert( sqlite3_mutex_held(pPage->pBt->mutex) );
assert( pPage->leaf==0 );
assert( pPage->noPayload );
assert( pPage->childPtrSize==4 );
pInfo->nSize = 4 + getVarint(&pCell[4], (u64*)&pInfo->nKey);
pInfo->nPayload = 0;
pInfo->nLocal = 0;
pInfo->iOverflow = 0;
pInfo->pPayload = 0;
return;
}
static void btreeParseCellPtr(
MemPage *pPage, /* Page containing the cell */
u8 *pCell, /* Pointer to the cell text. */
@@ -995,23 +1057,12 @@ static void btreeParseCellPtr(
assert( sqlite3_mutex_held(pPage->pBt->mutex) );
assert( pPage->leaf==0 || pPage->leaf==1 );
if( pPage->intKeyLeaf ){
assert( pPage->childPtrSize==0 );
pIter = pCell + getVarint32(pCell, nPayload);
pIter += getVarint(pIter, (u64*)&pInfo->nKey);
}else if( pPage->noPayload ){
assert( pPage->childPtrSize==4 );
pInfo->nSize = 4 + getVarint(&pCell[4], (u64*)&pInfo->nKey);
pInfo->nPayload = 0;
pInfo->nLocal = 0;
pInfo->iOverflow = 0;
pInfo->pPayload = 0;
return;
}else{
pIter = pCell + pPage->childPtrSize;
pIter += getVarint32(pIter, nPayload);
pInfo->nKey = nPayload;
}
assert( pPage->intKeyLeaf || pPage->noPayload );
assert( pPage->noPayload==0 );
assert( pPage->intKeyLeaf );
assert( pPage->childPtrSize==0 );
pIter = pCell + getVarint32(pCell, nPayload);
pIter += getVarint(pIter, (u64*)&pInfo->nKey);
pInfo->nPayload = nPayload;
pInfo->pPayload = pIter;
testcase( nPayload==pPage->maxLocal );
@@ -1025,31 +1076,46 @@ static void btreeParseCellPtr(
pInfo->nLocal = (u16)nPayload;
pInfo->iOverflow = 0;
}else{
/* If the payload will not fit completely on the local page, we have
** to decide how much to store locally and how much to spill onto
** overflow pages. The strategy is to minimize the amount of unused
** space on overflow pages while keeping the amount of local storage
** in between minLocal and maxLocal.
**
** Warning: changing the way overflow payload is distributed in any
** way will result in an incompatible file format.
*/
int minLocal; /* Minimum amount of payload held locally */
int maxLocal; /* Maximum amount of payload held locally */
int surplus; /* Overflow payload available for local storage */
btreeParseCellAdjustSizeForOverflow(pPage, pCell, pInfo);
}
}
static void btreeParseCellPtrIndex(
MemPage *pPage, /* Page containing the cell */
u8 *pCell, /* Pointer to the cell text. */
CellInfo *pInfo /* Fill in this structure */
){
u8 *pIter; /* For scanning through pCell */
u32 nPayload; /* Number of bytes of cell payload */
minLocal = pPage->minLocal;
maxLocal = pPage->maxLocal;
surplus = minLocal + (nPayload - minLocal)%(pPage->pBt->usableSize - 4);
testcase( surplus==maxLocal );
testcase( surplus==maxLocal+1 );
if( surplus <= maxLocal ){
pInfo->nLocal = (u16)surplus;
}else{
pInfo->nLocal = (u16)minLocal;
}
pInfo->iOverflow = (u16)(&pInfo->pPayload[pInfo->nLocal] - pCell);
pInfo->nSize = pInfo->iOverflow + 4;
assert( sqlite3_mutex_held(pPage->pBt->mutex) );
assert( pPage->leaf==0 || pPage->leaf==1 );
assert( pPage->intKeyLeaf==0 );
assert( pPage->noPayload==0 );
pIter = pCell + pPage->childPtrSize;
nPayload = *pIter;
if( nPayload>=0x80 ){
u8 *pEnd = &pIter[9];
nPayload &= 0x7f;
do{
nPayload = (nPayload<<7) | (*++pIter & 0x7f);
}while( *(pIter)>=0x80 && pIter<pEnd );
}
pIter++;
pInfo->nKey = nPayload;
pInfo->nPayload = nPayload;
pInfo->pPayload = pIter;
testcase( nPayload==pPage->maxLocal );
testcase( nPayload==pPage->maxLocal+1 );
if( nPayload<=pPage->maxLocal ){
/* This is the (easy) common case where the entire payload fits
** on the local page. No overflow is required.
*/
pInfo->nSize = nPayload + (u16)(pIter - pCell);
if( pInfo->nSize<4 ) pInfo->nSize = 4;
pInfo->nLocal = (u16)nPayload;
pInfo->iOverflow = 0;
}else{
btreeParseCellAdjustSizeForOverflow(pPage, pCell, pInfo);
}
}
static void btreeParseCell(
@@ -1057,19 +1123,20 @@ static void btreeParseCell(
int iCell, /* The cell index. First cell is 0 */
CellInfo *pInfo /* Fill in this structure */
){
btreeParseCellPtr(pPage, findCell(pPage, iCell), pInfo);
pPage->xParseCell(pPage, findCell(pPage, iCell), pInfo);
}
/*
** The following routines are implementations of the MemPage.xCellSize
** method.
**
** Compute the total number of bytes that a Cell needs in the cell
** data area of the btree-page. The return number includes the cell
** data header and the local payload, but not any overflow page or
** the space used by the cell pointer.
**
** The first implementation, cellSizePtr(), handles pages that contain
** payload, which is to say all index pages and left table pages. The
** second cellSizePtrNoPayload() implemention is a high-speed version
** for pages that contain no payload - internal table pages.
** cellSizePtrNoPayload() => table internal nodes
** cellSizePtr() => all index nodes & table leaf nodes
*/
static u16 cellSizePtr(MemPage *pPage, u8 *pCell){
u8 *pIter = pCell + pPage->childPtrSize; /* For looping over bytes of pCell */
@@ -1082,7 +1149,7 @@ static u16 cellSizePtr(MemPage *pPage, u8 *pCell){
** cell. If SQLITE_DEBUG is defined, an assert() at the bottom of
** this function verifies that this invariant is not violated. */
CellInfo debuginfo;
btreeParseCellPtr(pPage, pCell, &debuginfo);
pPage->xParseCell(pPage, pCell, &debuginfo);
#endif
assert( pPage->noPayload==0 );
@@ -1130,7 +1197,7 @@ static u16 cellSizePtrNoPayload(MemPage *pPage, u8 *pCell){
** cell. If SQLITE_DEBUG is defined, an assert() at the bottom of
** this function verifies that this invariant is not violated. */
CellInfo debuginfo;
btreeParseCellPtr(pPage, pCell, &debuginfo);
pPage->xParseCell(pPage, pCell, &debuginfo);
#endif
assert( pPage->childPtrSize==4 );
@@ -1159,7 +1226,7 @@ static void ptrmapPutOvflPtr(MemPage *pPage, u8 *pCell, int *pRC){
CellInfo info;
if( *pRC ) return;
assert( pCell!=0 );
btreeParseCellPtr(pPage, pCell, &info);
pPage->xParseCell(pPage, pCell, &info);
if( info.iOverflow ){
Pgno ovfl = get4byte(&pCell[info.iOverflow]);
ptrmapPut(pPage->pBt, ovfl, PTRMAP_OVERFLOW1, pPage->pgno, pRC);
@@ -1540,10 +1607,12 @@ static int decodeFlags(MemPage *pPage, int flagByte){
if( pPage->leaf ){
pPage->intKeyLeaf = 1;
pPage->noPayload = 0;
pPage->xParseCell = btreeParseCellPtr;
}else{
pPage->intKeyLeaf = 0;
pPage->noPayload = 1;
pPage->xCellSize = cellSizePtrNoPayload;
pPage->xParseCell = btreeParseCellPtrNoPayload;
}
pPage->maxLocal = pBt->maxLeaf;
pPage->minLocal = pBt->minLeaf;
@@ -1557,6 +1626,7 @@ static int decodeFlags(MemPage *pPage, int flagByte){
pPage->intKey = 0;
pPage->intKeyLeaf = 0;
pPage->noPayload = 0;
pPage->xParseCell = btreeParseCellPtrIndex;
pPage->maxLocal = pBt->maxLocal;
pPage->minLocal = pBt->minLocal;
}else{
@@ -3147,7 +3217,7 @@ static int modifyPagePointer(MemPage *pPage, Pgno iFrom, Pgno iTo, u8 eType){
u8 *pCell = findCell(pPage, i);
if( eType==PTRMAP_OVERFLOW1 ){
CellInfo info;
btreeParseCellPtr(pPage, pCell, &info);
pPage->xParseCell(pPage, pCell, &info);
if( info.iOverflow
&& pCell+info.iOverflow+3<=pPage->aData+pPage->maskPage
&& iFrom==get4byte(&pCell[info.iOverflow])
@@ -4992,7 +5062,7 @@ int sqlite3BtreeMovetoUnpacked(
** case this happens. */
void *pCellKey;
u8 * const pCellBody = pCell - pPage->childPtrSize;
btreeParseCellPtr(pPage, pCellBody, &pCur->info);
pPage->xParseCell(pPage, pCellBody, &pCur->info);
nCell = (int)pCur->info.nKey;
testcase( nCell<0 ); /* True if key size is 2^32 or more */
testcase( nCell==0 ); /* Invalid key size: 0x80 0x80 0x00 */
@@ -5781,7 +5851,7 @@ static int clearCell(
u32 ovflPageSize;
assert( sqlite3_mutex_held(pPage->pBt->mutex) );
btreeParseCellPtr(pPage, pCell, &info);
pPage->xParseCell(pPage, pCell, &info);
*pnSize = info.nSize;
if( info.iOverflow==0 ){
return SQLITE_OK; /* No overflow pages. Return without doing anything */
@@ -5935,7 +6005,7 @@ static int fillInCell(
#if SQLITE_DEBUG
{
CellInfo info;
btreeParseCellPtr(pPage, pCell, &info);
pPage->xParseCell(pPage, pCell, &info);
assert( nHeader=(int)(info.pPayload - pCell) );
assert( info.nKey==nKey );
assert( *pnSize == info.nSize );
@@ -6584,7 +6654,7 @@ static int ptrmapCheckPages(MemPage **apPage, int nPage){
u8 *z;
z = findCell(pPage, j);
btreeParseCellPtr(pPage, z, &info);
pPage->xParseCell(pPage, z, &info);
if( info.iOverflow ){
Pgno ovfl = get4byte(&z[info.iOverflow]);
ptrmapGet(pBt, ovfl, &e, &n);
@@ -7215,7 +7285,7 @@ static int balance_nonroot(
*/
CellInfo info;
j--;
btreeParseCellPtr(pNew, apCell[j], &info);
pNew->xParseCell(pNew, apCell[j], &info);
pCell = pTemp;
sz = 4 + putVarint(&pCell[4], info.nKey);
pTemp = 0;
@@ -8715,7 +8785,7 @@ static int checkTreePage(
pCheck->v1 = iPage;
pCheck->v2 = i;
pCell = findCell(pPage,i);
btreeParseCellPtr(pPage, pCell, &info);
pPage->xParseCell(pPage, pCell, &info);
sz = info.nPayload;
/* For intKey pages, check that the keys are in order.
*/