1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Add more tests for LSM log file recovery. Fix a problem in recovering log

files that contain range deletes.

FossilOrigin-Name: e34eafd4c5b2bbf2735e136ad69b67bb4288ad4d01a0128d8e107ac46209a182
This commit is contained in:
dan
2017-07-03 09:00:18 +00:00
parent 24f6904e99
commit 05dda7ba8e
9 changed files with 107 additions and 46 deletions

View File

@ -199,9 +199,13 @@
#define LSM_LOG_WRITE 0x06
#define LSM_LOG_WRITE_CKSUM 0x07
#define LSM_LOG_DELETE 0x08
#define LSM_LOG_DELETE_CKSUM 0x09
#define LSM_LOG_DRANGE 0x0A
#define LSM_LOG_DRANGE_CKSUM 0x0B
/* Require a checksum every 32KB. */
#define LSM_CKSUM_MAXDATA (32*1024)
@ -653,6 +657,7 @@ static int logFlush(lsm_db *pDb, int eType){
*/
int lsmLogWrite(
lsm_db *pDb, /* Database handle */
int eType,
void *pKey, int nKey, /* Database key to write to log */
void *pVal, int nVal /* Database value (or nVal<0) to write */
){
@ -661,13 +666,19 @@ int lsmLogWrite(
int nReq; /* Bytes of space required in log */
int bCksum = 0; /* True to embed a checksum in this record */
assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
assert( LSM_LOG_WRITE==LSM_WRITE );
assert( LSM_LOG_DELETE==LSM_DELETE );
assert( LSM_LOG_DRANGE==LSM_DRANGE );
assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
if( pDb->bUseLog==0 ) return LSM_OK;
pLog = pDb->pLogWriter;
/* Determine how many bytes of space are required, assuming that a checksum
** will be embedded in this record (even though it may not be). */
nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
if( nVal>=0 ) nReq += lsmVarintLen32(nVal) + nVal;
if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
/* Jump over the jump region if required. Set bCksum to true to tell the
** code below to include a checksum in the record if either (a) writing
@ -687,9 +698,10 @@ int lsmLogWrite(
** DELETE) or 2 (for WRITE) varints. */
assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
*(a++) = (nVal>=0 ? LSM_LOG_WRITE : LSM_LOG_DELETE) | (u8)bCksum;
assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
*(a++) = (u8)eType | (u8)bCksum;
a += lsmVarintPut32(a, nKey);
if( nVal>=0 ) a += lsmVarintPut32(a, nVal);
if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
if( bCksum ){
pLog->buf.n = (a - (u8 *)pLog->buf.z);
@ -699,7 +711,7 @@ int lsmLogWrite(
memcpy(a, pKey, nKey);
a += nKey;
if( nVal>=0 ){
if( eType!=LSM_LOG_DELETE ){
memcpy(a, pVal, nVal);
a += nVal;
}
@ -1005,6 +1017,8 @@ int lsmLogRecover(lsm_db *pDb){
break;
}
case LSM_LOG_DRANGE:
case LSM_LOG_DRANGE_CKSUM:
case LSM_LOG_WRITE:
case LSM_LOG_WRITE_CKSUM: {
int nKey;
@ -1013,7 +1027,7 @@ int lsmLogRecover(lsm_db *pDb){
logReaderVarint(&reader, &buf1, &nKey, &rc);
logReaderVarint(&reader, &buf2, &nVal, &rc);
if( eType==LSM_LOG_WRITE_CKSUM ){
if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
logReaderCksum(&reader, &buf1, &bEof, &rc);
}else{
bEof = logRequireCksum(&reader, nKey+nVal);
@ -1023,7 +1037,11 @@ int lsmLogRecover(lsm_db *pDb){
logReaderBlob(&reader, &buf1, nKey, 0, &rc);
logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
if( iPass==1 && rc==LSM_OK ){
rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
}else{
rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
}
}
break;
}