1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-11-11 01:42:22 +03:00

Make sure constraint checks occur in the correct order, even in the

presence of upserts.

FossilOrigin-Name: 07fb30c3de7ff396ae2ce8a0d20352b56f17a5db0af99a921c7bfe9bd4018115
This commit is contained in:
drh
2018-04-14 20:24:36 +00:00
parent 0b30a11645
commit 096fd476c1
3 changed files with 126 additions and 26 deletions

View File

@@ -1157,6 +1157,42 @@ static int checkConstraintUnchanged(Expr *pExpr, int *aiChng, int chngRowid){
return !w.eCode;
}
/*
** An instance of the ConstraintAddr object remembers the byte-code addresses
** for sections of the constraint checks that deal with uniqueness constraints
** on the rowid and on the upsert constraint.
**
** This information is passed into checkReorderConstraintChecks() to insert
** some OP_Goto operations so that the rowid and upsert constraints occur
** in the correct order relative to other constraints.
*/
typedef struct ConstraintAddr ConstraintAddr;
struct ConstraintAddr {
int ipkTop; /* Subroutine for rowid constraint check */
int upsertTop; /* Label for upsert constraint check subroutine */
int ipkBtm; /* Return opcode rowid constraint check */
int upsertBtm; /* upsert constraint returns to this label */
};
/*
** Generate any OP_Goto operations needed to cause constraints to be
** run that haven't already been run.
*/
static void reorderConstraintChecks(Vdbe *v, ConstraintAddr *p){
if( p->upsertTop ){
sqlite3VdbeGoto(v, p->upsertTop);
VdbeComment((v, "call upsert subroutine"));
sqlite3VdbeResolveLabel(v, p->upsertBtm);
p->upsertTop = 0;
}
if( p->ipkTop ){
sqlite3VdbeGoto(v, p->ipkTop);
VdbeComment((v, "call rowid constraint-check subroutine"));
sqlite3VdbeJumpHere(v, p->ipkBtm);
p->ipkTop = 0;
}
}
/*
** Generate code to do constraint checks prior to an INSERT or an UPDATE
** on table pTab.
@@ -1266,10 +1302,11 @@ void sqlite3GenerateConstraintChecks(
int addr1; /* Address of jump instruction */
int seenReplace = 0; /* True if REPLACE is used to resolve INT PK conflict */
int nPkField; /* Number of fields in PRIMARY KEY. 1 for ROWID tables */
int ipkTop = 0; /* Top of the rowid change constraint check */
int ipkBottom = 0; /* Bottom of the rowid change constraint check */
ConstraintAddr sAddr;/* Address information for constraint reordering */
Index *pUpIdx = 0; /* Index to which to apply the upsert */
u8 isUpdate; /* True if this is an UPDATE operation */
u8 bAffinityDone = 0; /* True if the OP_Affinity operation has been run */
int upsertBypass = 0; /* Address of Goto to bypass upsert subroutine */
isUpdate = regOldData!=0;
db = pParse->db;
@@ -1277,6 +1314,8 @@ void sqlite3GenerateConstraintChecks(
assert( v!=0 );
assert( pTab->pSelect==0 ); /* This table is not a VIEW */
nCol = pTab->nCol;
sAddr.ipkTop = 0;
sAddr.upsertTop = 0;
/* pPk is the PRIMARY KEY index for WITHOUT ROWID tables and NULL for
** normal rowid tables. nPkField is the number of key fields in the
@@ -1376,6 +1415,53 @@ void sqlite3GenerateConstraintChecks(
}
#endif /* !defined(SQLITE_OMIT_CHECK) */
/* UNIQUE and PRIMARY KEY constraints should be handled in the following
** order:
**
** (1) OE_Abort, OE_Fail, OE_Rollback, OE_Ignore
** (2) OE_Update
** (3) OE_Replace
**
** OE_Fail and OE_Ignore must happen before any changes are made.
** OE_Update guarantees that only a single row will change, so it
** must happen before OE_Replace. Technically, OE_Abort and OE_Rollback
** could happen in any order, but they are grouped up front for
** convenience.
**
** Constraint checking code is generated in this order:
** (A) The rowid constraint
** (B) Unique index constraints that do not have OE_Replace as their
** default conflict resolution strategy
** (C) Unique index that do use OE_Replace by default.
**
** The ordering of (2) and (3) is accomplished by making sure the linked
** list of indexes attached to a table puts all OE_Replace indexes last
** in the list. See sqlite3CreateIndex() for where that happens.
*/
/* If there is an ON CONFLICT clause without a constraint-target
** (In other words, one of "ON CONFLICT DO NOTHING" or
** "ON DUPLICATION KEY UPDATE") then change the overrideError to
** whichever is appropriate.
*/
if( pUpsert ){
if( pUpsert->pUpsertTarget==0 ){
if( pUpsert->pUpsertSet==0 ){
/* An ON CONFLICT DO NOTHING clause, without a constraint-target.
** Make all unique constraint resolution be OE_Ignore */
overrideError = OE_Ignore;
pUpsert = 0;
}else{
/* An ON DUPLICATE KEY UPDATE clause. All unique constraints
** do upsert processing */
overrideError = OE_Update;
}
}else if( (pUpIdx = pUpsert->pUpsertIdx)!=0 ){
sAddr.upsertTop = sqlite3VdbeMakeLabel(v);
sAddr.upsertBtm = sqlite3VdbeMakeLabel(v);
}
}
/* If rowid is changing, make sure the new rowid does not previously
** exist in the table.
*/
@@ -1400,7 +1486,7 @@ void sqlite3GenerateConstraintChecks(
}
/* figure out whether or not upsert applies in this case */
if( pUpsert && (pUpsert->pUpsertTarget==0 || pUpsert->pUpsertIdx==0) ){
if( pUpsert && pUpsert->pUpsertIdx==0 ){
if( pUpsert->pUpsertSet==0 ){
onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */
}else{
@@ -1413,17 +1499,21 @@ void sqlite3GenerateConstraintChecks(
** to defer the running of the rowid conflict checking until after
** the UNIQUE constraints have run.
*/
if( onError==OE_Replace && overrideError!=OE_Replace ){
for(pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext){
if( pIdx->onError==OE_Ignore || pIdx->onError==OE_Fail ){
ipkTop = sqlite3VdbeAddOp0(v, OP_Goto);
break;
}
}
assert( OE_Update>OE_Replace );
assert( OE_Ignore<OE_Replace );
assert( OE_Fail<OE_Replace );
assert( OE_Abort<OE_Replace );
assert( OE_Rollback<OE_Replace );
if( onError>=OE_Replace
&& onError!=overrideError
&& pTab->pIndex
){
sAddr.ipkTop = sqlite3VdbeAddOp0(v, OP_Goto)+1;
}
/* Check to see if the new rowid already exists in the table. Skip
** the following conflict logic if it does not. */
VdbeNoopComment((v, "constraint checks for ROWID"));
sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, addrRowidOk, regNewData);
VdbeCoverage(v);
@@ -1499,9 +1589,9 @@ void sqlite3GenerateConstraintChecks(
}
}
sqlite3VdbeResolveLabel(v, addrRowidOk);
if( ipkTop ){
ipkBottom = sqlite3VdbeAddOp0(v, OP_Goto);
sqlite3VdbeJumpHere(v, ipkTop);
if( sAddr.ipkTop ){
sAddr.ipkBtm = sqlite3VdbeAddOp0(v, OP_Goto);
sqlite3VdbeJumpHere(v, sAddr.ipkTop-1);
}
}
@@ -1519,12 +1609,17 @@ void sqlite3GenerateConstraintChecks(
int addrUniqueOk; /* Jump here if the UNIQUE constraint is satisfied */
if( aRegIdx[ix]==0 ) continue; /* Skip indices that do not change */
VdbeNoopComment((v, "constraint checks for %s", pIdx->zName));
if( bAffinityDone==0 ){
sqlite3TableAffinity(v, pTab, regNewData+1);
bAffinityDone = 1;
}
iThisCur = iIdxCur+ix;
addrUniqueOk = sqlite3VdbeMakeLabel(v);
if( pUpIdx==pIdx ){
addrUniqueOk = sAddr.upsertBtm;
}else{
addrUniqueOk = sqlite3VdbeMakeLabel(v);
}
/* Skip partial indices for which the WHERE clause is not true */
if( pIdx->pPartIdxWhere ){
@@ -1583,14 +1678,20 @@ void sqlite3GenerateConstraintChecks(
}else if( onError==OE_Default ){
onError = OE_Abort;
}
if( onError==OE_Replace ){
reorderConstraintChecks(v, &sAddr);
}
/* Figure out if the upsert clause applies to this index */
if( pUpsert && (pUpsert->pUpsertTarget==0 || pUpsert->pUpsertIdx==pIdx) ){
if( pUpIdx==pIdx ){
if( pUpsert->pUpsertSet==0 ){
onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */
}else{
onError = OE_Update; /* DO UPDATE */
}
upsertBypass = sqlite3VdbeGoto(v, 0);
VdbeComment((v, "Upsert bypass"));
sqlite3VdbeResolveLabel(v, sAddr.upsertTop);
}
/* Collision detection may be omitted if all of the following are true:
@@ -1710,11 +1811,10 @@ void sqlite3GenerateConstraintChecks(
sqlite3VdbeResolveLabel(v, addrUniqueOk);
sqlite3ExprCachePop(pParse);
if( regR!=regIdx ) sqlite3ReleaseTempRange(pParse, regR, nPkField);
if( pUpIdx==pIdx ) sqlite3VdbeJumpHere(v, upsertBypass);
}
if( ipkTop ){
sqlite3VdbeGoto(v, ipkTop+1);
sqlite3VdbeJumpHere(v, ipkBottom);
}
reorderConstraintChecks(v, &sAddr);
*pbMayReplace = seenReplace;
VdbeModuleComment((v, "END: GenCnstCks(%d)", seenReplace));