diff --git a/manifest b/manifest index 96f52aa1fd..05160f3b5a 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Do\snot\sallow\ssqlite3_rsync\sto\sconvert\sthe\sreplica\sfrom\sWAL-mode\sinto\nDELETE-mode,\sas\sthat\scan\sdisrupt\sexisting\sclients\son\sthe\sreplica\sside.\nDELETE-mode\sto\sWAL-mode\sconversions\sare\sallowed,\showever.\s\sSee\n[forum:/forumpost/6b575b66156673ee|forum\sthread\s6b575b66156]. -D 2025-05-02T11:18:09.023 +C This\sis\sthe\sstart\sof\san\sexperiment\sin\sgetting\ssqlite3_rsync\sto\suse\sless\nbandwidth\swhen\sthe\stwo\sdatabases\sare\svery\ssimilar,\sby\ssending\shashes\nover\sblocks\sof\spages\sinitially,\srather\sthan\sover\sindividual\spages,\sthen\nrequesting\smore\sdetail\swhen\shashes\sdo\snot\smatch. +D 2025-05-02T17:39:21.504 F .fossil-settings/binary-glob 61195414528fb3ea9693577e1980230d78a1f8b0a54c78cf1b9b24d0a409ed6a x F .fossil-settings/empty-dirs dbb81e8fc0401ac46a1491ab34a7f2c7c0452f2f06b54ebb845d024ca8283ef1 F .fossil-settings/ignore-glob 35175cdfcf539b2318cb04a9901442804be81cd677d8b889fcc9149c21f239ea @@ -2189,7 +2189,7 @@ F tool/spellsift.tcl 52b4b04dc4333c7ab024f09d9d66ed6b6f7c6eb00b38497a09f338fa55d F tool/split-sqlite3c.tcl 07e18a1d8cc3f6b3a4a1f3528e63c9b29a5c8a7bca0b8d394b231da464ce1247 F tool/sqldiff.c 134be7866be19f8beb32043d5aea5657f01aaeae2df8d33d758ff722c78666b9 F tool/sqlite3_analyzer.c.in 14f02cb5ec3c264cd6107d1f1dad77092b1cf440fc196c30b69ae87b56a1a43b -F tool/sqlite3_rsync.c a8e1962d9e0418b37d6865e483640c49498efe64bf542022e845b056f6eb9cce +F tool/sqlite3_rsync.c 438ffaf829181863ea941c8e6d5faabfe9161d7a464b3dce1857ac2fabe09273 F tool/sqltclsh.c.in 1bcc2e9da58fadf17b0bf6a50e68c1159e602ce057210b655d50bad5aaaef898 F tool/sqltclsh.tcl 862f4cf1418df5e1315b5db3b5ebe88969e2a784525af5fbf9596592f14ed848 F tool/src-verify.c d00f93263aa2fa6ba0cba0106d95458e6effb94fdb5fc634f56834f90c05bbb4 @@ -2207,8 +2207,11 @@ F tool/version-info.c 3b36468a90faf1bbd59c65fd0eb66522d9f941eedd364fabccd7227350 F tool/warnings-clang.sh bbf6a1e685e534c92ec2bfba5b1745f34fb6f0bc2a362850723a9ee87c1b31a7 F tool/warnings.sh 49a486c5069de041aedcbde4de178293e0463ae9918ecad7539eedf0ec77a139 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f -P 4b53603fe468c0c28b818762917e41bdd870de6d4cc143688f1cdea3136c81a4 -R 77c0a71cca6be2a9f28e992f31255bba +P 660a035b6ce6684d429b882133e032181cc1664f4efadf1bc0e4ae27d45071c4 +R 2f18aca9a98c725fdd5da1ebbb1e7566 +T *branch * faster-rsync +T *sym-faster-rsync * +T -sym-trunk * U drh -Z 4ae06cab268d08247b608c21e7458e1c +Z 08dacd140b0440acc75d69379c13dfa6 # Remove this line to create a well-formed Fossil manifest. diff --git a/manifest.uuid b/manifest.uuid index 0d00d48c76..9d75464a6e 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -660a035b6ce6684d429b882133e032181cc1664f4efadf1bc0e4ae27d45071c4 +266b4b8f0104bd4b1cff87ed78b0223006bf661a9650294a2b330d50c7ee8a0c diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c index 34faaf0fd1..6fb3650949 100644 --- a/tool/sqlite3_rsync.c +++ b/tool/sqlite3_rsync.c @@ -75,20 +75,26 @@ struct SQLiteRsync { /* Magic numbers to identify particular messages sent over the wire. */ +/**** Baseline: protocol version 1 ****/ #define ORIGIN_BEGIN 0x41 /* Initial message */ #define ORIGIN_END 0x42 /* Time to quit */ #define ORIGIN_ERROR 0x43 /* Error message from the remote */ #define ORIGIN_PAGE 0x44 /* New page data */ #define ORIGIN_TXN 0x45 /* Transaction commit */ #define ORIGIN_MSG 0x46 /* Informational message */ +/**** Added in protocol version 2 ****/ +#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */ +#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */ +/**** Baseline: protocol version 1 ****/ #define REPLICA_BEGIN 0x61 /* Welcome message */ #define REPLICA_ERROR 0x62 /* Error. Report and quit. */ #define REPLICA_END 0x63 /* Replica wants to stop */ #define REPLICA_HASH 0x64 /* One or more pages hashes to report */ #define REPLICA_READY 0x65 /* Read to receive page content */ #define REPLICA_MSG 0x66 /* Informational message */ - +/**** Added in protocol version 2 ****/ +#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */ /**************************************************************************** ** Beginning of the popen2() implementation copied from Fossil ************* @@ -796,11 +802,49 @@ static void hashFunc( sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT); } +/* +** Implementation of the agghash(X) function. +** +** Return a 160-bit BLOB which is the hash of the concatenation +** of all X inputs. +*/ +static void agghashStep( + sqlite3_context *context, + int argc, + sqlite3_value **argv +){ + HashContext *pCx; + int eType = sqlite3_value_type(argv[0]); + int nByte = sqlite3_value_bytes(argv[0]); + if( eType==SQLITE_NULL ) return; + pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx)); + if( pCx==0 ) return; + if( pCx->iSize==0 ) HashInit(pCx, 160); + if( eType==SQLITE_BLOB ){ + HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte); + }else{ + HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte); + } +} +static void agghashFinal(sqlite3_context *context){ + HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0); + if( pCx ){ + sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT); + } +} + /* Register the hash function */ static int hashRegister(sqlite3 *db){ - return sqlite3_create_function(db, "hash", 1, + int rc; + rc = sqlite3_create_function(db, "hash", 1, SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, 0, hashFunc, 0, 0); + if( rc==SQLITE_OK ){ + rc = sqlite3_create_function(db, "agghash", 1, + SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, + 0, 0, agghashStep, agghashFinal); + } + return rc; } /* End of the hashing logic @@ -1192,6 +1236,13 @@ static void closeDb(SQLiteRsync *p){ ** nPage, and szPage. Then enter a loop responding to message from ** the replica: ** +** REPLICA_BEGIN iProtocol +** +** An optional message sent by the replica in response to the +** prior ORIGIN_BEGIN with a counter-proposal for the protocol +** level. If seen, try to reduce the protocol level to what is +** requested and send a new ORGIN_BEGIN. +** ** REPLICA_ERROR size text ** ** Report an error from the replica and quit @@ -1202,24 +1253,36 @@ static void closeDb(SQLiteRsync *p){ ** ** REPLICA_HASH hash ** -** The argument is the 20-byte SHA1 hash for the next page -** page hashes appear in sequential order with no gaps. +** The argument is the 20-byte SHA1 hash for the next page or +** block of pages. Hashes appear in sequential order with no gaps, +** unless there is an intervening REPLICA_CONFIG message. +** +** REPLICA_CONFIG pgno cnt +** +** Set counters used by REPLICA_HASH. The next hash will start +** on page pgno and all subsequent hashes will cover cnt pages +** each. Note that for a multi-page hash, the hash value is +** actually a hash of the individual page hashes. ** ** REPLICA_READY ** ** The replica has sent all the hashes that it intends to send. ** This side (the origin) can now start responding with page -** content for pages that do not have a matching hash. +** content for pages that do not have a matching hash or with +** ORIGIN_DETAIL messages with requests for more detail. */ static void originSide(SQLiteRsync *p){ int rc = 0; int c = 0; unsigned int nPage = 0; - unsigned int iPage = 0; + unsigned int iHash = 1; /* Pgno for next hash to receive */ + unsigned int nHash = 1; /* Number of pages per hash received */ unsigned int lockBytePage = 0; unsigned int szPg = 0; - sqlite3_stmt *pCkHash = 0; - sqlite3_stmt *pInsHash = 0; + sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */ + sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */ + sqlite3_stmt *pInsHash = 0; /* Record a bad hash */ + unsigned int nMulti = 0; /* Multi-page hashes not matched */ char buf[200]; p->isReplica = 0; @@ -1270,11 +1333,16 @@ static void originSide(SQLiteRsync *p){ ** that is larger than what it knows about. The replica sends back ** a counter-proposal of an earlier protocol which the origin can ** accept by resending a new ORIGIN_BEGIN. */ - p->iProtocol = readByte(p); - writeByte(p, ORIGIN_BEGIN); - writeByte(p, p->iProtocol); - writePow2(p, p->szPage); - writeUint32(p, p->nPage); + u8 newProtocol = readByte(p); + if( newProtocol < p->iProtocol ){ + p->iProtocol = newProtocol; + writeByte(p, ORIGIN_BEGIN); + writeByte(p, p->iProtocol); + writePow2(p, p->szPage); + writeUint32(p, p->nPage); + }else{ + reportError(p, "Invalid REPLICA_BEGIN reply"); + } break; } case REPLICA_MSG: @@ -1282,25 +1350,60 @@ static void originSide(SQLiteRsync *p){ readAndDisplayMessage(p, c); break; } + case REPLICA_CONFIG: { + readUint32(p, &iHash); + readUint32(p, &nHash); + break; + } case REPLICA_HASH: { if( pCkHash==0 ){ - runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)"); + runSql(p, "CREATE TEMP TABLE badHash(" + " pgno INTEGER PRIMARY KEY," + " sz INT)"); pCkHash = prepareStmt(p, "SELECT pgno FROM sqlite_dbpage('main')" - " WHERE pgno=?1 AND hash(data)!=?2" + " WHERE pgno=?1 AND hash(data)!=?3" ); if( pCkHash==0 ) break; - pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)"); + pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)"); if( pInsHash==0 ) break; } p->nHashSent++; - iPage++; - sqlite3_bind_int64(pCkHash, 1, iPage); - readBytes(p, 20, buf); - sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC); - rc = sqlite3_step(pCkHash); + if( nHash>1 ){ + if( pCkHashN==0 ){ + pCkHashN = prepareStmt(p, + "WITH a1(pgno) AS " + "(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgnodb)); + } + sqlite3_reset(pCkHashN); + }else{ + sqlite3_bind_int64(pCkHash, 1, iHash); + readBytes(p, 20, buf); + sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHash); + if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); + } + sqlite3_reset(pCkHash); + } if( rc==SQLITE_ROW ){ - sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0)); + sqlite3_bind_int64(pInsHash, 1, iHash); + sqlite3_bind_int64(pInsHash, 2, nHash); rc = sqlite3_step(pInsHash); if( rc!=SQLITE_DONE ){ reportError(p, "SQL statement [%s] failed: %s", @@ -1308,42 +1411,57 @@ static void originSide(SQLiteRsync *p){ } sqlite3_reset(pInsHash); } - else if( rc!=SQLITE_DONE ){ - reportError(p, "SQL statement [%s] failed: %s", - sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); - } - sqlite3_reset(pCkHash); + iHash += nHash; break; } case REPLICA_READY: { - sqlite3_stmt *pStmt; - sqlite3_finalize(pCkHash); - sqlite3_finalize(pInsHash); - pCkHash = 0; - pInsHash = 0; - if( iPage+1nPage ){ - runSql(p, "WITH RECURSIVE c(n) AS" - " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" - " INSERT INTO badHash SELECT n FROM c", - iPage+1, p->nPage); + if( nMulti>0 ){ + sqlite3_stmt *pStmt; + pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW ){ + writeByte(p, ORIGIN_DETAIL); + writeUint32(p, sqlite3_column_int(pStmt, 0)); + writeUint32(p, sqlite3_column_int(pStmt, 1)); + } + sqlite3_finalize(pStmt); + runSql(p, "DELETE FROM badHash WHERE sz>1"); + nMulti = 0; + writeByte(p, ORIGIN_READY); + }else{ + sqlite3_stmt *pStmt; + sqlite3_finalize(pCkHash); + sqlite3_finalize(pCkHashN); + sqlite3_finalize(pInsHash); + pCkHash = 0; + pInsHash = 0; + if( iHash+1nPage ){ + runSql(p, "WITH RECURSIVE c(n) AS" + " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + " INSERT INTO badHash SELECT n, 1 FROM c", + iHash+1, p->nPage); + } + runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); + pStmt = prepareStmt(p, + "SELECT pgno, data" + " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW + && p->nErr==0 + && p->nWrErr==0 + ){ + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); + const void *pContent = sqlite3_column_blob(pStmt, 1); + writeByte(p, ORIGIN_PAGE); + writeUint32(p, pgno); + writeBytes(p, szPg, pContent); + p->nPageSent++; + } + sqlite3_finalize(pStmt); + writeByte(p, ORIGIN_TXN); + writeUint32(p, nPage); + writeByte(p, ORIGIN_END); } - runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); - pStmt = prepareStmt(p, - "SELECT pgno, data" - " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); - if( pStmt==0 ) break; - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ - unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); - const void *pContent = sqlite3_column_blob(pStmt, 1); - writeByte(p, ORIGIN_PAGE); - writeUint32(p, pgno); - writeBytes(p, szPg, pContent); - p->nPageSent++; - } - sqlite3_finalize(pStmt); - writeByte(p, ORIGIN_TXN); - writeUint32(p, nPage); - writeByte(p, ORIGIN_END); fflush(p->pOut); break; } @@ -1360,6 +1478,88 @@ static void originSide(SQLiteRsync *p){ closeDb(p); } +/* +** Send a REPLICA_HASH message for each entry in the sendHash table. +** The sendHash table looks like this: +** +** CREATE TABLE sendHash( +** fpg INTEGER PRIMARY KEY, -- Page number of the hash +** npg INT -- Number of pages in this hash +** ); +** +** If iHash is page number for the next page that the origin will +** be expecting, and nHash is the number of pages that the origin will +** be expecting in the hash that follows. Send a REPLICA_CONFIG message +** if either of these values if not correct. +*/ +static void sendHashMessages( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int iHash, /* Next page expected by origin */ + unsigned int nHash /* Next number of pages expected by origin */ +){ + sqlite3_stmt *pStmt; + pStmt = prepareStmt(p, + "SELECT if(npg==1," + " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg)," + " (WITH RECURSIVE c(n) AS" + " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE nnErr==0 && p->nWrErr==0 ){ + const unsigned char *a = sqlite3_column_blob(pStmt, 0); + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1); + unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2); + if( pgno!=iHash || npg!=nHash ){ + writeByte(p, REPLICA_CONFIG); + writeUint32(p, pgno); + writeUint32(p, npg); + } + writeByte(p, REPLICA_HASH); + writeBytes(p, 20, a); + p->nHashSent++; + iHash = pgno + npg; + nHash = npg; + } + sqlite3_finalize(pStmt); + runSql(p, "DELETE FROM sendHash"); + writeByte(p, REPLICA_READY); + fflush(p->pOut); +} + +/* +** Make entries in the sendHash table to send hashes for +** npg (mnemonic: Number of PaGes) pages starting with fpg +** (mnemonic: First PaGe). +*/ +static void subdivideHashRange( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int fpg, /* First page of the range */ + unsigned int npg /* Number of pages */ +){ + unsigned int nChunk; /* How many pages to request per hash */ + sqlite3_uint64 iEnd; /* One more than the last page */ + if( npg<=30 ){ + nChunk = 1; + }else if( npg<=1000 ){ + nChunk = 30; + }else{ + nChunk = 1000; + } + iEnd = fpg; + iEnd += npg; + runSql(p, + "WITH RECURSIVE c(n) AS" + " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)" + "REPLACE INTO sendHash(fpg,npg)" + " SELECT n, min(%llu-fpg,%u) FROM c", + fpg, nChunk, iEnd, iEnd, nChunk + ); +} + /* ** Run the replica-side protocol. The protocol is passive in the sense ** that it only response to message from the origin side. @@ -1370,15 +1570,35 @@ static void originSide(SQLiteRsync *p){ ** each page in the origin database (sent as a single-byte power-of-2), ** and the number of pages in the origin database. ** This procedure checks compatibility, and if everything is ok, -** it starts sending hashes of pages already present back to the origin. +** it starts sending hashes back to the origin using REPLICA_HASH +** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY. +** REPLICA_CONFIG is only sent if the protocol is 2 or greater. ** -** ORIGIN_ERROR size text +** ORIGIN_ERROR size text ** -** Report the received error and quit. +** Report an error and quit. ** -** ORIGIN_PAGE pgno content +** ORIGIN_DETAIL pgno cnt ** -** Update the content of the given page. +** The origin reports that a multi-page hash starting at pgno and +** spanning cnt pages failed to match. The origin is requesting +** details (more REPLICA_HASH message with a smaller cnt). The +** replica must wait on ORIGIN_READY before sending its reply. +** +** ORIGIN_READY +** +** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY +** is sent by the origin to indicate that it has finished sending +** requests for detail and is ready for the replicate to reply +** with a new round of REPLICA_CONFIG and REPLICA_HASH messages. +** +** ORIGIN_PAGE pgno content +** +** Once the origin believes it knows exactly which pages need to be +** updated in the replica, it starts sending those pages using these +** messages. These messages will only appear immediately after +** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and +** ORIGIN_PAGE messages in the same batch. ** ** ORIGIN_TXN pgno ** @@ -1418,7 +1638,6 @@ static void replicaSide(SQLiteRsync *p){ unsigned int nOPage = 0; unsigned int nRPage = 0, szRPage = 0; int rc = 0; - sqlite3_stmt *pStmt = 0; closeDb(p); p->iProtocol = readByte(p); @@ -1458,6 +1677,12 @@ static void replicaSide(SQLiteRsync *p){ closeDb(p); break; } + runSql(p, + "CREATE TABLE sendHash(" + " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */ + " npg INT" /* Number of pages in this hash */ + ")" + ); hashRegister(p->db); if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){ break; @@ -1484,23 +1709,30 @@ static void replicaSide(SQLiteRsync *p){ "replica is %d bytes", szOPage, szRPage); break; } - - pStmt = prepareStmt(p, - "SELECT hash(data) FROM sqlite_dbpage('replica')" - " WHERE pgno<=min(%d,%d)" - " ORDER BY pgno", nRPage, nOPage); - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ - const unsigned char *a = sqlite3_column_blob(pStmt, 0); - writeByte(p, REPLICA_HASH); - writeBytes(p, 20, a); - p->nHashSent++; + if( p->iProtocol<2 ){ + runSql(p, + "WITH RECURSIVE c(n) AS" + "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c", + nRPage); + }else{ + subdivideHashRange(p, 1, nRPage); } - sqlite3_finalize(pStmt); - writeByte(p, REPLICA_READY); - fflush(p->pOut); + sendHashMessages(p, 1, 1); runSql(p, "PRAGMA writable_schema=ON"); break; } + case ORIGIN_DETAIL: { + unsigned int fpg, npg; + readUint32(p, &fpg); + readUint32(p, &npg); + subdivideHashRange(p, fpg, npg); + break; + } + case ORIGIN_READY: { + sendHashMessages(p, 0, 0); + break; + } case ORIGIN_TXN: { unsigned int nOPage = 0; readUint32(p, &nOPage);