1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-08-01 06:27:03 +03:00

Now appears to be working. More testing needed. Refinement of the

version-2 algorithm needed.

FossilOrigin-Name: cb035181d9fb5909696b8ec8f9c3eeb7a7dfb4b50e82e1d3f2d5ad150afcc0ff
This commit is contained in:
drh
2025-05-02 22:25:40 +00:00
parent f124ddf36a
commit bef9565485
3 changed files with 66 additions and 47 deletions

View File

@ -1298,12 +1298,12 @@ static void originSide(SQLiteRsync *p){
unsigned int nPage = 0;
unsigned int iHash = 1; /* Pgno for next hash to receive */
unsigned int nHash = 1; /* Number of pages per hash received */
unsigned int mxHash = 0; /* Maximum hash value received */
unsigned int lockBytePage = 0;
unsigned int szPg = 0;
sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */
sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */
sqlite3_stmt *pInsHash = 0; /* Record a bad hash */
unsigned int nMulti = 0; /* Multi-page hashes not matched */
char buf[200];
p->isReplica = 0;
@ -1387,38 +1387,37 @@ static void originSide(SQLiteRsync *p){
break;
}
case REPLICA_HASH: {
int bMatch = 0;
if( pCkHash==0 ){
runSql(p, "CREATE TEMP TABLE badHash("
" pgno INTEGER PRIMARY KEY,"
" sz INT)");
pCkHash = prepareStmt(p,
"SELECT pgno FROM sqlite_dbpage('main')"
" WHERE pgno=?1 AND hash(data)!=?3"
"SELECT hash(data)==?3 FROM sqlite_dbpage('main')"
" WHERE pgno=?1"
);
if( pCkHash==0 ) break;
pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)");
if( pInsHash==0 ) break;
}
p->nHashSent++;
if( p->zDebugFile ){
debugMessage(p, "<- REPLICA_HASH %u %u\n", iHash, nHash);
}
readBytes(p, 20, buf);
if( nHash>1 ){
if( pCkHashN==0 ){
pCkHashN = prepareStmt(p,
"WITH a1(pgno) AS "
"(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgno<?2)"
"SELECT count(*) FROM a1 CROSS JOIN sqlite_dbpage('main')"
" USING(pgno)"
" HAVING agghash(hash(data))!=?3");
"WITH c(n) AS "
" (VALUES(?1) UNION ALL SELECT n+1 FROM c WHERE n<?2)"
"SELECT agghash(hash(data))==?3"
" FROM c CROSS JOIN sqlite_dbpage('main') ON pgno=n"
);
if( pCkHashN==0 ) break;
}
sqlite3_bind_int64(pCkHashN, 1, iHash);
sqlite3_bind_int64(pCkHashN, 2, iHash + nHash);
readBytes(p, 20, buf);
sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
sqlite3_bind_int64(pCkHashN, 2, iHash + nHash - 1);
sqlite3_bind_blob(pCkHashN, 3, buf, 20, SQLITE_STATIC);
rc = sqlite3_step(pCkHashN);
if( rc==SQLITE_ROW ){
nMulti++;
bMatch = sqlite3_column_int(pCkHashN,0);
}else if( rc==SQLITE_ERROR ){
reportError(p, "SQL statement [%s] failed: %s",
sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db));
@ -1426,16 +1425,24 @@ static void originSide(SQLiteRsync *p){
sqlite3_reset(pCkHashN);
}else{
sqlite3_bind_int64(pCkHash, 1, iHash);
readBytes(p, 20, buf);
sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
rc = sqlite3_step(pCkHash);
if( rc==SQLITE_ERROR ){
reportError(p, "SQL statement [%s] failed: %s",
sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
}else if( rc==SQLITE_ROW && sqlite3_column_int(pCkHash,0) ){
bMatch = 1;
}
sqlite3_reset(pCkHash);
}
if( rc==SQLITE_ROW ){
if( p->zDebugFile ){
debugMessage(p, "<- REPLICA_HASH %u %u %s %08x...\n",
iHash, nHash,
bMatch ? "match" : "fail",
*(unsigned int*)buf
);
}
if( !bMatch ){
sqlite3_bind_int64(pInsHash, 1, iHash);
sqlite3_bind_int64(pInsHash, 2, nHash);
rc = sqlite3_step(pInsHash);
@ -1445,30 +1452,32 @@ static void originSide(SQLiteRsync *p){
}
sqlite3_reset(pInsHash);
}
if( iHash+nHash>mxHash ) mxHash = iHash+nHash;
iHash += nHash;
break;
}
case REPLICA_READY: {
int nMulti = 0;
sqlite3_stmt *pStmt;
if( p->zDebugFile ){
debugMessage(p, "<- REPLICA_READY\n");
}
if( nMulti>0 ){
sqlite3_stmt *pStmt;
pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1");
if( pStmt==0 ) break;
while( sqlite3_step(pStmt)==SQLITE_ROW ){
unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1);
writeByte(p, ORIGIN_DETAIL);
writeUint32(p, pgno);
writeUint32(p, cnt);
if( p->zDebugFile ){
debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt);
}
pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1");
if( pStmt==0 ) break;
while( sqlite3_step(pStmt)==SQLITE_ROW ){
unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1);
writeByte(p, ORIGIN_DETAIL);
writeUint32(p, pgno);
writeUint32(p, cnt);
nMulti++;
if( p->zDebugFile ){
debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt);
}
sqlite3_finalize(pStmt);
}
sqlite3_finalize(pStmt);
if( nMulti ){
runSql(p, "DELETE FROM badHash WHERE sz>1");
nMulti = 0;
writeByte(p, ORIGIN_READY);
if( p->zDebugFile ) debugMessage(p, "-> ORIGIN_READY\n");
}else{
@ -1478,11 +1487,11 @@ static void originSide(SQLiteRsync *p){
sqlite3_finalize(pInsHash);
pCkHash = 0;
pInsHash = 0;
if( iHash+1<p->nPage ){
if( mxHash<p->nPage ){
runSql(p, "WITH RECURSIVE c(n) AS"
" (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
" INSERT INTO badHash SELECT n, 1 FROM c",
iHash+1, p->nPage);
mxHash, p->nPage);
}
runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
pStmt = prepareStmt(p,
@ -1551,7 +1560,7 @@ static void sendHashMessages(
"SELECT if(npg==1,"
" (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg),"
" (WITH RECURSIVE c(n) AS"
" (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg)"
" (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg-1)"
" SELECT agghash(hash(data))"
" FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash,"
" fpg,"
@ -1570,9 +1579,18 @@ static void sendHashMessages(
debugMessage(p, "-> REPLICA_CONFIG %u %u\n", pgno, npg);
}
}
writeByte(p, REPLICA_HASH);
writeBytes(p, 20, a);
if( p->zDebugFile ) debugMessage(p, "-> REPLICA_HASH %u\n", iHash);
if( a==0 ){
if( p->zDebugFile ){
debugMessage(p, "# Oops: No hash for %u %u\n", pgno, npg);
}
}else{
writeByte(p, REPLICA_HASH);
writeBytes(p, 20, a);
if( p->zDebugFile ){
debugMessage(p, "-> REPLICA_HASH %u %u (%08x...)\n",
pgno, npg, *(unsigned int*)a);
}
}
p->nHashSent++;
iHash = pgno + npg;
nHash = npg;
@ -1770,14 +1788,15 @@ static void replicaSide(SQLiteRsync *p){
"replica is %d bytes", szOPage, szRPage);
break;
}
if( p->iProtocol<2 ){
if( p->iProtocol<2 || nRPage<=100 ){
runSql(p,
"WITH RECURSIVE c(n) AS"
"(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
"INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c",
nRPage);
}else{
subdivideHashRange(p, 1, nRPage);
runSql(p,"INSERT INTO sendHash VALUES(1,1)");
subdivideHashRange(p, 2, nRPage);
}
sendHashMessages(p, 1, 1);
runSql(p, "PRAGMA writable_schema=ON");