1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Fix a problem with handling rebasing UPDATE changes for REPLACE conflict

resolution.

FossilOrigin-Name: f7bf71f1d47044e3cbc74018294b8af5ad52c2bb84954e99bbd4e9b8c36fc077
This commit is contained in:
dan
2018-03-16 18:02:47 +00:00
parent f1b40e8305
commit f01d3a7ef7
5 changed files with 243 additions and 49 deletions

View File

@ -232,8 +232,8 @@ struct SessionTable {
** statement.
**
** For a DELETE change, all fields within the record except those associated
** with PRIMARY KEY columns are set to "undefined". The PRIMARY KEY fields
** contain the values identifying the row to delete.
** with PRIMARY KEY columns are omitted. The PRIMARY KEY fields contain the
** values identifying the row to delete.
**
** For an UPDATE change, all fields except those associated with PRIMARY KEY
** columns and columns that are modified by the UPDATE are set to "undefined".
@ -3816,34 +3816,18 @@ static int sessionRebaseAdd(
assert( eType==SQLITE_CHANGESET_REPLACE||eType==SQLITE_CHANGESET_OMIT );
assert( eOp==SQLITE_DELETE || eOp==SQLITE_INSERT || eOp==SQLITE_UPDATE );
if( eType==SQLITE_CHANGESET_REPLACE ){
sessionAppendByte(&p->rebase, SQLITE_DELETE, &rc);
sessionAppendByte(&p->rebase, 0, &rc);
for(i=0; i<p->nCol; i++){
if( p->abPK[i]==0 ){
sessionAppendByte(&p->rebase, 0, &rc);
}else{
sqlite3_value *pVal = 0;
if( eOp==SQLITE_INSERT ){
sqlite3changeset_new(pIter, i, &pVal);
}else{
sqlite3changeset_old(pIter, i, &pVal);
}
sessionAppendValue(&p->rebase, pVal, &rc);
}
}
}else{
sessionAppendByte(&p->rebase, SQLITE_INSERT, &rc);
sessionAppendByte(&p->rebase, eOp==SQLITE_DELETE, &rc);
for(i=0; i<p->nCol; i++){
sqlite3_value *pVal = 0;
if( eOp!=SQLITE_INSERT && p->abPK[i] ){
sqlite3changeset_old(pIter, i, &pVal);
}else{
sqlite3changeset_new(pIter, i, &pVal);
}
sessionAppendValue(&p->rebase, pVal, &rc);
sessionAppendByte(&p->rebase,
(eOp==SQLITE_DELETE ? SQLITE_DELETE : SQLITE_INSERT), &rc
);
sessionAppendByte(&p->rebase, (eType==SQLITE_CHANGESET_REPLACE), &rc);
for(i=0; i<p->nCol; i++){
sqlite3_value *pVal = 0;
if( eOp==SQLITE_DELETE || (eOp==SQLITE_UPDATE && p->abPK[i]) ){
sqlite3changeset_old(pIter, i, &pVal);
}else{
sqlite3changeset_new(pIter, i, &pVal);
}
sessionAppendValue(&p->rebase, pVal, &rc);
}
return rc;
@ -5026,6 +5010,56 @@ static void sessionAppendRecordMerge(
}
}
static void sessionAppendPartialUpdate(
SessionBuffer *pBuf,
sqlite3_changeset_iter *pIter,
u8 *aRec, int nRec,
u8 *aChange, int nChange,
int *pRc
){
sessionBufferGrow(pBuf, 2+nRec+nChange, pRc);
if( *pRc==SQLITE_OK ){
int bData = 0;
u8 *pOut = &pBuf->aBuf[pBuf->nBuf];
int i;
u8 *a1 = aRec;
u8 *a2 = aChange;
*pOut++ = SQLITE_UPDATE;
*pOut++ = pIter->bIndirect;
for(i=0; i<pIter->nCol; i++){
int n1 = sessionSerialLen(a1);
int n2 = sessionSerialLen(a2);
if( pIter->abPK[i] || a2[0]==0 ){
if( !pIter->abPK[i] ) bData = 1;
memcpy(pOut, a1, n1);
pOut += n1;
}else{
*pOut++ = '\0';
}
a1 += n1;
a2 += n2;
}
if( bData ){
a2 = aChange;
for(i=0; i<pIter->nCol; i++){
int n1 = sessionSerialLen(a1);
int n2 = sessionSerialLen(a2);
if( pIter->abPK[i] || a2[0]==0 ){
memcpy(pOut, a1, n1);
pOut += n1;
}else{
*pOut++ = '\0';
}
a1 += n1;
a2 += n2;
}
pBuf->nBuf = (pOut - pBuf->aBuf);
}
}
}
static int sessionRebase(
sqlite3_rebaser *p, /* Rebaser hash table */
sqlite3_changeset_iter *pIter, /* Input data */
@ -5043,6 +5077,7 @@ static int sessionRebase(
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec, &bNew) ){
SessionChange *pChange = 0;
int bDone = 0;
if( bNew ){
const char *zTab = pIter->zTab;
@ -5071,6 +5106,70 @@ static int sessionRebase(
if( pChange ){
assert( pChange->op==SQLITE_DELETE || pChange->op==SQLITE_INSERT );
switch( pIter->op ){
case SQLITE_INSERT:
if( pChange->op==SQLITE_INSERT ){
bDone = 1;
if( pChange->bIndirect==0 ){
sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}
}
break;
case SQLITE_UPDATE:
bDone = 1;
if( pChange->op==SQLITE_DELETE ){
if( pChange->bIndirect==0 ){
u8 *pCsr = aRec;
sessionSkipRecord(&pCsr, pIter->nCol);
sessionAppendByte(&sOut, SQLITE_INSERT, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
pCsr, nRec-(pCsr-aRec),
pChange->aRecord, pChange->nRecord, &rc
);
}
}else{
if( pChange->bIndirect==0 ){
u8 *pCsr = aRec;
sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendRecordMerge(&sOut, pIter->nCol, 0,
aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
);
sessionSkipRecord(&pCsr, pIter->nCol);
sessionAppendBlob(&sOut, pCsr, nRec - (pCsr-aRec), &rc);
}else{
sessionAppendPartialUpdate(&sOut, pIter,
aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
);
}
}
break;
default:
assert( pIter->op==SQLITE_DELETE );
bDone = 1;
if( pChange->op==SQLITE_INSERT ){
sessionAppendByte(&sOut, SQLITE_DELETE, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
pChange->aRecord, pChange->nRecord, aRec, nRec, &rc
);
}
break;
}
}
if( bDone==0 ){
sessionAppendByte(&sOut, pIter->op, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}
#if 0
/* If pChange is an INSERT, then rebase the change. If it is a
** DELETE, omit the change from the output altogether. */
if( pChange->op==SQLITE_INSERT ){
@ -5097,12 +5196,15 @@ static int sessionRebase(
}
}
}else{
sessionAppendByte(&sOut, pIter->op, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
if( pIter->op==SQLITE_INSERT ){
sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendBlob(&sOut, pChange->aRecord, pChange->nRecord, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}else{
u8 *pCsr = aRec;
sessionAppendByte(&sOut, pIter->op, &rc);
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendRecordMerge(&sOut, pIter->nCol, 0,
aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
);
@ -5118,6 +5220,7 @@ static int sessionRebase(
sessionAppendByte(&sOut, pIter->bIndirect, &rc);
sessionAppendBlob(&sOut, aRec, nRec, &rc);
}
#endif
if( rc==SQLITE_OK && xOutput && sOut.nBuf>SESSIONS_STRM_CHUNK_SIZE ){
rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);