1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Add the experimental sqlite3session_changeset_size() API.

FossilOrigin-Name: b5564a6fd54875db1de884fdc0e5eeabcd6aa5595ad03a8a60843503e830a2d8
This commit is contained in:
dan
2021-04-21 20:52:17 +00:00
parent 7437c25b63
commit a23a873fbb
12 changed files with 305 additions and 35 deletions

View File

@ -49,6 +49,7 @@ struct sqlite3_session {
void *pFilterCtx; /* First argument to pass to xTableFilter */
int (*xTableFilter)(void *pCtx, const char *zTab);
i64 nMalloc; /* Number of bytes of data allocated */
i64 nMaxChangesetSize;
sqlite3_value *pZeroBlob; /* Value containing X'' */
sqlite3_session *pNext; /* Next session object on same db. */
SessionTable *pTable; /* List of attached tables */
@ -291,8 +292,9 @@ struct SessionTable {
** this structure stored in a SessionTable.aChange[] hash table.
*/
struct SessionChange {
int op; /* One of UPDATE, DELETE, INSERT */
int bIndirect; /* True if this change is "indirect" */
u8 op; /* One of UPDATE, DELETE, INSERT */
u8 bIndirect; /* True if this change is "indirect" */
int nMaxSize; /* Max size of eventual changeset record */
int nRecord; /* Number of bytes in buffer aRecord[] */
u8 *aRecord; /* Buffer containing old.* record */
SessionChange *pNext; /* For hash-table collisions */
@ -1121,6 +1123,10 @@ static int sessionInitTable(sqlite3_session *pSession, SessionTable *pTab){
if( 0==sqlite3_stricmp("sqlite_stat1", pTab->zName) ){
pTab->bStat1 = 1;
}
pSession->nMaxChangesetSize += (
1 + sessionVarintLen(pTab->nCol) + pTab->nCol + strlen(pTab->zName) + 1
);
}
}
return (pSession->rc || pTab->abPK==0);
@ -1166,6 +1172,103 @@ static int sessionStat1Depth(void *pCtx){
return p->hook.xDepth(p->hook.pCtx);
}
static int sessionUpdateMaxSize(
int op,
sqlite3_session *pSession, /* Session object pTab is attached to */
SessionTable *pTab, /* Table that change applies to */
SessionChange *pC /* Update pC->nMaxSize */
){
i64 nNew = 2;
if( pC->op==SQLITE_INSERT ){
if( op!=SQLITE_DELETE ){
int ii;
for(ii=0; ii<pTab->nCol; ii++){
sqlite3_value *p = 0;
pSession->hook.xNew(pSession->hook.pCtx, ii, &p);
sessionSerializeValue(0, p, &nNew);
}
}
}else if( op==SQLITE_DELETE ){
nNew += pC->nRecord;
if( sqlite3_preupdate_blobwrite(pSession->db)>=0 ){
nNew += pC->nRecord;
}
}else{
int ii;
u8 *pCsr = pC->aRecord;
for(ii=0; ii<pTab->nCol; ii++){
int bChanged = 1;
int nOld = 0;
int eType;
sqlite3_value *p = 0;
pSession->hook.xNew(pSession->hook.pCtx, ii, &p);
if( p==0 ){
return SQLITE_NOMEM;
}
eType = *pCsr++;
switch( eType ){
case SQLITE_NULL:
bChanged = sqlite3_value_type(p)!=SQLITE_NULL;
break;
case SQLITE_FLOAT:
case SQLITE_INTEGER: {
if( eType==sqlite3_value_type(p) ){
sqlite3_int64 iVal = sessionGetI64(pCsr);
if( eType==SQLITE_INTEGER ){
bChanged = (iVal!=sqlite3_value_int64(p));
}else{
double dVal;
memcpy(&dVal, &iVal, 8);
bChanged = (dVal!=sqlite3_value_double(p));
}
}
nOld = 8;
pCsr += 8;
break;
}
default: {
int nByte;
nOld = sessionVarintGet(pCsr, &nByte);
pCsr += nOld;
nOld += nByte;
assert( eType==SQLITE_TEXT || eType==SQLITE_BLOB );
if( eType==sqlite3_value_type(p)
&& nByte==sqlite3_value_bytes(p)
&& (nByte==0 || 0==memcmp(pCsr, sqlite3_value_blob(p), nByte))
){
bChanged = 0;
}
pCsr += nByte;
break;
}
}
if( bChanged && pTab->abPK[ii] ){
nNew = pC->nRecord + 2;
break;
}
if( bChanged ){
nNew += 1 + nOld;
sessionSerializeValue(0, p, &nNew);
}else if( pTab->abPK[ii] ){
nNew += 2 + nOld;
}else{
nNew += 2;
}
}
}
if( nNew>pC->nMaxSize ){
int nIncr = nNew - pC->nMaxSize;
pC->nMaxSize = nNew;
pSession->nMaxChangesetSize += nIncr;
}
return SQLITE_OK;
}
/*
** This function is only called from with a pre-update-hook reporting a
@ -1239,7 +1342,6 @@ static void sessionPreupdateOneChange(
/* Create a new change object containing all the old values (if
** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK
** values (if this is an INSERT). */
SessionChange *pChange; /* New change object */
sqlite3_int64 nByte; /* Number of bytes to allocate */
int i; /* Used to iterate through columns */
@ -1265,13 +1367,13 @@ static void sessionPreupdateOneChange(
}
/* Allocate the change object */
pChange = (SessionChange *)sessionMalloc64(pSession, nByte);
if( !pChange ){
pC = (SessionChange *)sessionMalloc64(pSession, nByte);
if( !pC ){
rc = SQLITE_NOMEM;
goto error_out;
}else{
memset(pChange, 0, sizeof(SessionChange));
pChange->aRecord = (u8 *)&pChange[1];
memset(pC, 0, sizeof(SessionChange));
pC->aRecord = (u8 *)&pC[1];
}
/* Populate the change object. None of the preupdate_old(),
@ -1286,17 +1388,17 @@ static void sessionPreupdateOneChange(
}else if( pTab->abPK[i] ){
pSession->hook.xNew(pSession->hook.pCtx, i, &p);
}
sessionSerializeValue(&pChange->aRecord[nByte], p, &nByte);
sessionSerializeValue(&pC->aRecord[nByte], p, &nByte);
}
/* Add the change to the hash-table */
if( pSession->bIndirect || pSession->hook.xDepth(pSession->hook.pCtx) ){
pChange->bIndirect = 1;
pC->bIndirect = 1;
}
pChange->nRecord = nByte;
pChange->op = op;
pChange->pNext = pTab->apChange[iHash];
pTab->apChange[iHash] = pChange;
pC->nRecord = nByte;
pC->op = op;
pC->pNext = pTab->apChange[iHash];
pTab->apChange[iHash] = pC;
}else if( pC->bIndirect ){
/* If the existing change is considered "indirect", but this current
@ -1307,8 +1409,12 @@ static void sessionPreupdateOneChange(
pC->bIndirect = 0;
}
}
assert( rc==SQLITE_OK );
rc = sessionUpdateMaxSize(op, pSession, pTab, pC);
}
/* If an error has occurred, mark the session object as failed. */
error_out:
if( pTab->bStat1 ){
@ -2520,7 +2626,9 @@ int sqlite3session_changeset(
int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */
void **ppChangeset /* OUT: Buffer containing changeset */
){
return sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset, ppChangeset);
int rc = sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset,ppChangeset);
assert( rc || pnChangeset==0 || *pnChangeset<=pSession->nMaxChangesetSize );
return rc;
}
/*
@ -2612,6 +2720,13 @@ sqlite3_int64 sqlite3session_memory_used(sqlite3_session *pSession){
return pSession->nMalloc;
}
/*
** Return the maximum size of sqlite3session_changeset() output.
*/
sqlite3_int64 sqlite3session_changeset_size(sqlite3_session *pSession){
return pSession->nMaxChangesetSize;
}
/*
** Do the work for either sqlite3changeset_start() or start_strm().
*/