1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-27 20:41:58 +03:00

Begin adding the sqlite3session_patchset() API to the sessions extension. This is an interim commit.

FossilOrigin-Name: 60a4565a8c44762a002cd02979317df5ca47e899
This commit is contained in:
dan
2014-08-15 20:15:49 +00:00
parent f3a3dfe9f1
commit 73b3c05590
6 changed files with 435 additions and 49 deletions

View File

@ -35,6 +35,7 @@ struct sqlite3_session {
struct sqlite3_changeset_iter {
u8 *aChangeset; /* Pointer to buffer containing changeset */
int nChangeset; /* Number of bytes in aChangeset */
int bPatchset; /* True if this is a patchset */
u8 *pNext; /* Pointer to next change within aChangeset */
int rc; /* Iterator error code */
sqlite3_stmt *pConflict; /* Points to conflicting row, if any */
@ -122,6 +123,7 @@ struct SessionTable {
**
** 1 byte: Constant 0x54 (capital 'T')
** Varint: Big-endian integer set to the number of columns in the table.
** nCol bytes: 0x01 for PK columns, 0x00 otherwise.
** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated.
**
** Followed by one or more changes to the table.
@ -130,6 +132,25 @@ struct SessionTable {
** 1 byte: The "indirect-change" flag.
** old.* record: (delete and update only)
** new.* record: (insert and update only)
**
** PATCHSET FORMAT:
**
** A patchset is also a collection of changes. It is similar to a changeset,
** but omits those fields that are not useful if no conflict resolution
** is required when applying the changeset.
**
** Each group of changes begins with a table header:
**
** 1 byte: Constant 0x50 (capital 'P')
** Varint: Big-endian integer set to the number of columns in the table.
** nCol bytes: 0x01 for PK columns, 0x00 otherwise.
** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated.
**
** Followed by one or more changes to the table.
**
** 1 byte: Either SQLITE_INSERT, UPDATE or DELETE.
** 1 byte: The "indirect-change" flag.
** single record: (PK fields for DELETE, or full record for INSERT/UPDATE).
*/
/*
@ -1449,6 +1470,7 @@ static void sessionAppendCol(
*/
static int sessionAppendUpdate(
SessionBuffer *pBuf, /* Buffer to append to */
int bPatchset, /* True for "patchset", 0 for "changeset" */
sqlite3_stmt *pStmt, /* Statement handle pointing at new row */
SessionChange *p, /* Object containing old values */
u8 *abPK /* Boolean array - true for PK columns */
@ -1506,15 +1528,23 @@ static int sessionAppendUpdate(
}
}
if( bChanged || abPK[i] ){
sessionAppendBlob(pBuf, pCsr, nAdvance, &rc);
}else{
sessionAppendByte(pBuf, 0, &rc);
/* If at least one field has been modified, this is not a no-op. */
if( bChanged ) bNoop = 0;
/* Add a field to the old.* record. This is omitted if this modules is
** currently generating a patchset. */
if( bPatchset==0 ){
if( bChanged || abPK[i] ){
sessionAppendBlob(pBuf, pCsr, nAdvance, &rc);
}else{
sessionAppendByte(pBuf, 0, &rc);
}
}
if( bChanged ){
/* Add a field to the new.* record. Or the only record if currently
** generating a patchset. */
if( bChanged || (bPatchset && abPK[i]) ){
sessionAppendCol(&buf2, pStmt, i, &rc);
bNoop = 0;
}else{
sessionAppendByte(&buf2, 0, &rc);
}
@ -1532,6 +1562,56 @@ static int sessionAppendUpdate(
return rc;
}
static int sessionAppendDelete(
SessionBuffer *pBuf, /* Buffer to append to */
int bPatchset, /* True for "patchset", 0 for "changeset" */
sqlite3_stmt *pStmt, /* Statement handle pointing at new row */
SessionChange *p, /* Object containing old values */
u8 *abPK /* Boolean array - true for PK columns */
){
int rc = SQLITE_OK;
sessionAppendByte(pBuf, SQLITE_DELETE, &rc);
sessionAppendByte(pBuf, p->bIndirect, &rc);
if( bPatchset==0 ){
sessionAppendBlob(pBuf, p->aRecord, p->nRecord, &rc);
}else{
int nCol = sqlite3_column_count(pStmt);
int i;
u8 *a = p->aRecord;
for(i=0; i<nCol; i++){
u8 *pStart = a;
int eType = *a++;
switch( eType ){
case 0:
case SQLITE_NULL:
assert( abPK[i]==0 );
break;
case SQLITE_FLOAT:
case SQLITE_INTEGER:
a += 8;
break;
default: {
int n;
a += sessionVarintGet(a, &n);
a += n;
break;
}
}
if( abPK[i] ){
sessionAppendBlob(pBuf, pStart, a-pStart, &rc);
}
}
assert( (a - p->aRecord)==p->nRecord );
}
return rc;
}
/*
** Formulate and prepare a SELECT statement to retrieve a row from table
** zTab in database zDb based on its primary key. i.e.
@ -1654,25 +1734,20 @@ static int sessionSelectBind(
*/
static void sessionAppendTableHdr(
SessionBuffer *pBuf,
int bPatchset,
SessionTable *pTab,
int *pRc
){
/* Write a table header */
sessionAppendByte(pBuf, 'T', pRc);
sessionAppendByte(pBuf, (bPatchset ? 'P' : 'T'), pRc);
sessionAppendVarint(pBuf, pTab->nCol, pRc);
sessionAppendBlob(pBuf, pTab->abPK, pTab->nCol, pRc);
sessionAppendBlob(pBuf, (u8 *)pTab->zName, (int)strlen(pTab->zName)+1, pRc);
}
/*
** Obtain a changeset object containing all changes recorded by the
** session object passed as the first argument.
**
** It is the responsibility of the caller to eventually free the buffer
** using sqlite3_free().
*/
int sqlite3session_changeset(
int sessionGenerateChangeset(
sqlite3_session *pSession, /* Session object */
int bPatchset, /* True for patchset, false for changeset */
int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */
void **ppChangeset /* OUT: Buffer containing changeset */
){
@ -1711,7 +1786,7 @@ int sqlite3session_changeset(
}
/* Write a table header */
sessionAppendTableHdr(&buf, pTab, &rc);
sessionAppendTableHdr(&buf, bPatchset, pTab, &rc);
/* Build and compile a statement to execute: */
if( rc==SQLITE_OK ){
@ -1735,13 +1810,10 @@ int sqlite3session_changeset(
sessionAppendCol(&buf, pSel, iCol, &rc);
}
}else{
rc = sessionAppendUpdate(&buf, pSel, p, abPK);
rc = sessionAppendUpdate(&buf, bPatchset, pSel, p, abPK);
}
}else if( p->op!=SQLITE_INSERT ){
/* A DELETE change */
sessionAppendByte(&buf, SQLITE_DELETE, &rc);
sessionAppendByte(&buf, p->bIndirect, &rc);
sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
rc = sessionAppendDelete(&buf, bPatchset, pSel, p, abPK);
}
if( rc==SQLITE_OK ){
rc = sqlite3_reset(pSel);
@ -1769,6 +1841,36 @@ int sqlite3session_changeset(
return rc;
}
/*
** Obtain a changeset object containing all changes recorded by the
** session object passed as the first argument.
**
** It is the responsibility of the caller to eventually free the buffer
** using sqlite3_free().
*/
int sqlite3session_changeset(
sqlite3_session *pSession, /* Session object */
int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */
void **ppChangeset /* OUT: Buffer containing changeset */
){
return sessionGenerateChangeset(pSession, 0, pnChangeset, ppChangeset);
}
/*
** Obtain a patchset object containing all changes recorded by the
** session object passed as the first argument.
**
** It is the responsibility of the caller to eventually free the buffer
** using sqlite3_free().
*/
int sqlite3session_patchset(
sqlite3_session *pSession, /* Session object */
int *pnPatchset, /* OUT: Size of buffer at *ppChangeset */
void **ppPatchset /* OUT: Buffer containing changeset */
){
return sessionGenerateChangeset(pSession, 1, pnPatchset, ppPatchset);
}
/*
** Enable or disable the session object passed as the first argument.
*/
@ -1866,13 +1968,16 @@ int sqlite3changeset_start(
static int sessionReadRecord(
u8 **paChange, /* IN/OUT: Pointer to binary record */
int nCol, /* Number of values in record */
u8 *abPK, /* Array of primary key flags, or NULL */
sqlite3_value **apOut /* Write values to this array */
){
int i; /* Used to iterate through columns */
u8 *aRec = *paChange; /* Cursor for the serialized record */
for(i=0; i<nCol; i++){
int eType = *aRec++; /* Type of value (SQLITE_NULL, TEXT etc.) */
int eType;
if( abPK && abPK[i]==0 ) continue;
eType = *aRec++; /* Type of value (SQLITE_NULL, TEXT etc.) */
assert( !apOut || apOut[i]==0 );
if( eType ){
if( apOut ){
@ -1952,8 +2057,9 @@ static int sessionChangesetNext(
}
aChange = p->pNext;
if( aChange[0]=='T' ){
if( aChange[0]=='T' || aChange[0]=='P' ){
int nByte; /* Bytes to allocate for apValue */
p->bPatchset = (aChange[0]=='P');
aChange++;
aChange += sessionVarintGet(aChange, &p->nCol);
p->abPK = (u8 *)aChange;
@ -1981,18 +2087,36 @@ static int sessionChangesetNext(
if( paRec ){ *paRec = aChange; }
/* If this is an UPDATE or DELETE, read the old.* record. */
if( p->op!=SQLITE_INSERT ){
p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:p->apValue);
if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
u8 *abPK = p->bPatchset ? p->abPK : 0;
p->rc = sessionReadRecord(&aChange, p->nCol, abPK, paRec?0:p->apValue);
if( p->rc!=SQLITE_OK ) return p->rc;
}
/* If this is an INSERT or UPDATE, read the new.* record. */
if( p->op!=SQLITE_DELETE ){
p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:&p->apValue[p->nCol]);
sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
p->rc = sessionReadRecord(&aChange, p->nCol, 0, apOut);
if( p->rc!=SQLITE_OK ) return p->rc;
}
if( pnRec ){ *pnRec = (int)(aChange - *paRec); }
if( pnRec ){
*pnRec = (int)(aChange - *paRec);
}else if( p->bPatchset && p->op==SQLITE_UPDATE ){
/* If this is an UPDATE that is part of a patchset, then all PK and
** modified fields are present in the new.* record. The old.* record
** is currently completely empty. This block shifts the PK fields from
** new.* to old.*, to accommodate the code that reads these arrays. */
int i;
for(i=0; i<p->nCol; i++){
assert( p->apValue[i]==0 );
assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
if( p->abPK[i] ){
p->apValue[i] = p->apValue[i+p->nCol];
p->apValue[i+p->nCol] = 0;
}
}
}
p->pNext = aChange;
return SQLITE_ROW;
}
@ -2225,7 +2349,7 @@ int sqlite3changeset_invert(
int nByte;
u8 *aEnd = &aIn[i+2];
sessionReadRecord(&aEnd, nCol, 0);
sessionReadRecord(&aEnd, nCol, 0, 0);
aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
aOut[i+1] = aIn[i+1];
nByte = (int)(aEnd - &aIn[i+2]);
@ -2249,9 +2373,9 @@ int sqlite3changeset_invert(
}
/* Read the old.* and new.* records for the update change. */
rc = sessionReadRecord(&aEnd, nCol, &apVal[0]);
rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[0]);
if( rc==SQLITE_OK ){
rc = sessionReadRecord(&aEnd, nCol, &apVal[nCol]);
rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[nCol]);
}
/* Write the header for the new UPDATE change. Same as the original. */
@ -2781,10 +2905,21 @@ static int sessionApplyOneOp(
if( op==SQLITE_DELETE ){
/* Bind values to the DELETE statement. */
rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, 0, p->pDelete);
/* Bind values to the DELETE statement. If conflict handling is required,
** bind values for all columns and set bound variable (nCol+1) to true.
** Or, if conflict handling is not required, bind just the PK column
** values and, if it exists, set (nCol+1) to false. Conflict handling
** is not required if:
**
** * this is a patchset, or
** * (pbRetry==0), or
** * all columns of the table are PK columns (in this case there is
** no (nCol+1) variable to bind to).
*/
u8 *abPK = (pIter->bPatchset ? p->abPK : 0);
rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, abPK, p->pDelete);
if( rc==SQLITE_OK && sqlite3_bind_parameter_count(p->pDelete)>nCol ){
rc = sqlite3_bind_int(p->pDelete, nCol+1, pbRetry==0);
rc = sqlite3_bind_int(p->pDelete, nCol+1, (pbRetry==0 || abPK));
}
if( rc!=SQLITE_OK ) return rc;
@ -2816,7 +2951,9 @@ static int sessionApplyOneOp(
rc = sessionBindValue(p->pUpdate, i*3+3, pNew);
}
}
if( rc==SQLITE_OK ) sqlite3_bind_int(p->pUpdate, nCol*3+1, pbRetry==0);
if( rc==SQLITE_OK ){
sqlite3_bind_int(p->pUpdate, nCol*3+1, pbRetry==0 || pIter->bPatchset);
}
if( rc!=SQLITE_OK ) return rc;
/* Attempt the UPDATE. In the case of a NOTFOUND or DATA conflict,
@ -3099,7 +3236,7 @@ static int sessionChangeMerge(
u8 *a1 = aRec;
assert( op2==SQLITE_UPDATE );
pNew->op = SQLITE_INSERT;
sessionReadRecord(&a1, pTab->nCol, 0);
sessionReadRecord(&a1, pTab->nCol, 0, 0);
sessionMergeRecord(&aCsr, pTab->nCol, pExist->aRecord, a1);
}else if( op1==SQLITE_DELETE ){ /* DELETE + INSERT */
assert( op2==SQLITE_INSERT );
@ -3112,8 +3249,8 @@ static int sessionChangeMerge(
u8 *a1 = pExist->aRecord;
u8 *a2 = aRec;
assert( op1==SQLITE_UPDATE );
sessionReadRecord(&a1, pTab->nCol, 0);
sessionReadRecord(&a2, pTab->nCol, 0);
sessionReadRecord(&a1, pTab->nCol, 0, 0);
sessionReadRecord(&a2, pTab->nCol, 0, 0);
pNew->op = SQLITE_UPDATE;
if( 0==sessionMergeUpdate(&aCsr, pTab, aRec, pExist->aRecord, a1, a2) ){
sqlite3_free(pNew);
@ -3274,7 +3411,7 @@ int sqlite3changeset_concat(
int i;
if( pTab->nEntry==0 ) continue;
sessionAppendTableHdr(&buf, pTab, &rc);
sessionAppendTableHdr(&buf, 0, pTab, &rc);
for(i=0; i<pTab->nChange; i++){
SessionChange *p;
for(p=pTab->apChange[i]; p; p=p->pNext){