1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-11-11 01:42:22 +03:00

Enhance UPSERT so that it allows multiple ON CONFLICT clauses and does

not require a conflict target for DO UPDATE.

FossilOrigin-Name: 6b01a24daab1e5bcb0768ebf994368d941b1dfc217bf6b661211d900331e68cf
This commit is contained in:
drh
2020-12-14 15:39:12 +00:00
7 changed files with 772 additions and 169 deletions

View File

@@ -975,6 +975,7 @@ void sqlite3Insert(
}
#ifndef SQLITE_OMIT_UPSERT
if( pUpsert ){
Upsert *pNx;
if( IsVirtual(pTab) ){
sqlite3ErrorMsg(pParse, "UPSERT not implemented for virtual table \"%s\"",
pTab->zName);
@@ -988,13 +989,17 @@ void sqlite3Insert(
goto insert_cleanup;
}
pTabList->a[0].iCursor = iDataCur;
pUpsert->pUpsertSrc = pTabList;
pUpsert->regData = regData;
pUpsert->iDataCur = iDataCur;
pUpsert->iIdxCur = iIdxCur;
if( pUpsert->pUpsertTarget ){
sqlite3UpsertAnalyzeTarget(pParse, pTabList, pUpsert);
}
pNx = pUpsert;
do{
pNx->pUpsertSrc = pTabList;
pNx->regData = regData;
pNx->iDataCur = iDataCur;
pNx->iIdxCur = iIdxCur;
if( pNx->pUpsertTarget ){
sqlite3UpsertAnalyzeTarget(pParse, pTabList, pNx);
}
pNx = pNx->pNextUpsert;
}while( pNx!=0 );
}
#endif
@@ -1399,6 +1404,70 @@ int sqlite3ExprReferencesUpdatedColumn(
return w.eCode!=0;
}
/*
** The sqlite3GenerateConstraintChecks() routine usually wants to visit
** the indexes of a table in the order provided in the Table->pIndex list.
** However, sometimes (rarely - when there is an upsert) it wants to visit
** the indexes in a different order. The following data structures accomplish
** this.
**
** The IndexIterator object is used to walk through all of the indexes
** of a table in either Index.pNext order, or in some other order established
** by an array of IndexListTerm objects.
*/
typedef struct IndexListTerm IndexListTerm;
typedef struct IndexIterator IndexIterator;
struct IndexIterator {
int eType; /* 0 for Index.pNext list. 1 for an array of IndexListTerm */
int i; /* Index of the current item from the list */
union {
struct { /* Use this object for eType==0: A Index.pNext list */
Index *pIdx; /* The current Index */
} lx;
struct { /* Use this object for eType==1; Array of IndexListTerm */
int nIdx; /* Size of the array */
IndexListTerm *aIdx; /* Array of IndexListTerms */
} ax;
} u;
};
/* When IndexIterator.eType==1, then each index is an array of instances
** of the following object
*/
struct IndexListTerm {
Index *p; /* The index */
int ix; /* Which entry in the original Table.pIndex list is this index*/
};
/* Return the first index on the list */
static Index *indexIteratorFirst(IndexIterator *pIter, int *pIx){
assert( pIter->i==0 );
if( pIter->eType ){
*pIx = pIter->u.ax.aIdx[0].ix;
return pIter->u.ax.aIdx[0].p;
}else{
*pIx = 0;
return pIter->u.lx.pIdx;
}
}
/* Return the next index from the list. Return NULL when out of indexes */
static Index *indexIteratorNext(IndexIterator *pIter, int *pIx){
if( pIter->eType ){
int i = ++pIter->i;
if( i>=pIter->u.ax.nIdx ){
*pIx = i;
return 0;
}
*pIx = pIter->u.ax.aIdx[i].ix;
return pIter->u.ax.aIdx[i].p;
}else{
++(*pIx);
pIter->u.lx.pIdx = pIter->u.lx.pIdx->pNext;
return pIter->u.lx.pIdx;
}
}
/*
** Generate code to do constraint checks prior to an INSERT or an UPDATE
** on table pTab.
@@ -1507,7 +1576,7 @@ void sqlite3GenerateConstraintChecks(
){
Vdbe *v; /* VDBE under constrution */
Index *pIdx; /* Pointer to one of the indices */
Index *pPk = 0; /* The PRIMARY KEY index */
Index *pPk = 0; /* The PRIMARY KEY index for WITHOUT ROWID tables */
sqlite3 *db; /* Database connection */
int i; /* loop counter */
int ix; /* Index loop counter */
@@ -1515,11 +1584,11 @@ void sqlite3GenerateConstraintChecks(
int onError; /* Conflict resolution strategy */
int seenReplace = 0; /* True if REPLACE is used to resolve INT PK conflict */
int nPkField; /* Number of fields in PRIMARY KEY. 1 for ROWID tables */
Index *pUpIdx = 0; /* Index to which to apply the upsert */
u8 isUpdate; /* True if this is an UPDATE operation */
Upsert *pUpsertClause = 0; /* The specific ON CONFLICT clause for pIdx */
u8 isUpdate; /* True if this is an UPDATE operation */
u8 bAffinityDone = 0; /* True if the OP_Affinity operation has been run */
int upsertBypass = 0; /* Address of Goto to bypass upsert subroutine */
int upsertJump = 0; /* Address of Goto that jumps into upsert subroutine */
int upsertIpkReturn = 0; /* Address of Goto at end of IPK uniqueness check */
int upsertIpkDelay = 0; /* Address of Goto to bypass initial IPK check */
int ipkTop = 0; /* Top of the IPK uniqueness check */
int ipkBottom = 0; /* OP_Goto at the end of the IPK uniqueness check */
/* Variables associated with retesting uniqueness constraints after
@@ -1529,6 +1598,7 @@ void sqlite3GenerateConstraintChecks(
int lblRecheckOk = 0; /* Each recheck jumps to this label if it passes */
Trigger *pTrigger; /* List of DELETE triggers on the table pTab */
int nReplaceTrig = 0; /* Number of replace triggers coded */
IndexIterator sIdxIter; /* Index iterator */
isUpdate = regOldData!=0;
db = pParse->db;
@@ -1726,19 +1796,63 @@ void sqlite3GenerateConstraintChecks(
** list of indexes attached to a table puts all OE_Replace indexes last
** in the list. See sqlite3CreateIndex() for where that happens.
*/
sIdxIter.eType = 0;
sIdxIter.i = 0;
sIdxIter.u.ax.aIdx = 0; /* Silence harmless compiler warning */
sIdxIter.u.lx.pIdx = pTab->pIndex;
if( pUpsert ){
if( pUpsert->pUpsertTarget==0 ){
/* An ON CONFLICT DO NOTHING clause, without a constraint-target.
** Make all unique constraint resolution be OE_Ignore */
assert( pUpsert->pUpsertSet==0 );
overrideError = OE_Ignore;
pUpsert = 0;
}else if( (pUpIdx = pUpsert->pUpsertIdx)!=0 ){
/* If the constraint-target uniqueness check must be run first.
** Jump to that uniqueness check now */
upsertJump = sqlite3VdbeAddOp0(v, OP_Goto);
VdbeComment((v, "UPSERT constraint goes first"));
/* There is just on ON CONFLICT clause and it has no constraint-target */
assert( pUpsert->pNextUpsert==0 );
if( pUpsert->isDoUpdate==0 ){
/* A single ON CONFLICT DO NOTHING clause, without a constraint-target.
** Make all unique constraint resolution be OE_Ignore */
overrideError = OE_Ignore;
pUpsert = 0;
}else{
/* A single ON CONFLICT DO UPDATE. Make all resolutions OE_Update */
overrideError = OE_Update;
}
}else if( pTab->pIndex!=0 ){
/* Otherwise, we'll need to run the IndexListTerm array version of the
** iterator to ensure that all of the ON CONFLICT conditions are
** checked first and in order. */
int nIdx, jj;
u64 nByte;
Upsert *pTerm;
u8 *bUsed;
for(nIdx=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, nIdx++){
assert( aRegIdx[nIdx]>0 );
}
sIdxIter.eType = 1;
sIdxIter.u.ax.nIdx = nIdx;
nByte = (sizeof(IndexListTerm)+1)*nIdx + nIdx;
sIdxIter.u.ax.aIdx = sqlite3DbMallocZero(db, nByte);
if( sIdxIter.u.ax.aIdx==0 ) return; /* OOM */
bUsed = (u8*)&sIdxIter.u.ax.aIdx[nIdx];
pUpsert->pToFree = sIdxIter.u.ax.aIdx;
for(i=0, pTerm=pUpsert; pTerm; pTerm=pTerm->pNextUpsert){
if( pTerm->pUpsertTarget==0 ) break;
if( pTerm->pUpsertIdx==0 ) continue; /* Skip ON CONFLICT for the IPK */
jj = 0;
pIdx = pTab->pIndex;
while( ALWAYS(pIdx!=0) && pIdx!=pTerm->pUpsertIdx ){
pIdx = pIdx->pNext;
jj++;
}
if( bUsed[jj] ) continue; /* Duplicate ON CONFLICT clause ignored */
bUsed[jj] = 1;
sIdxIter.u.ax.aIdx[i].p = pIdx;
sIdxIter.u.ax.aIdx[i].ix = jj;
i++;
}
for(jj=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, jj++){
if( bUsed[jj] ) continue;
sIdxIter.u.ax.aIdx[i].p = pIdx;
sIdxIter.u.ax.aIdx[i].ix = jj;
i++;
}
assert( i==nIdx );
}
}
@@ -1801,11 +1915,20 @@ void sqlite3GenerateConstraintChecks(
}
/* figure out whether or not upsert applies in this case */
if( pUpsert && pUpsert->pUpsertIdx==0 ){
if( pUpsert->pUpsertSet==0 ){
onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */
}else{
onError = OE_Update; /* DO UPDATE */
if( pUpsert ){
pUpsertClause = sqlite3UpsertOfIndex(pUpsert,0);
if( pUpsertClause!=0 ){
if( pUpsertClause->isDoUpdate==0 ){
onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */
}else{
onError = OE_Update; /* DO UPDATE */
}
}
if( pUpsertClause!=pUpsert ){
/* The first ON CONFLICT clause has a conflict target other than
** the IPK. We have to jump ahead to that first ON CONFLICT clause
** and then come back here and deal with the IPK afterwards */
upsertIpkDelay = sqlite3VdbeAddOp0(v, OP_Goto);
}
}
@@ -1912,7 +2035,9 @@ void sqlite3GenerateConstraintChecks(
}
}
sqlite3VdbeResolveLabel(v, addrRowidOk);
if( ipkTop ){
if( pUpsert && pUpsertClause!=pUpsert ){
upsertIpkReturn = sqlite3VdbeAddOp0(v, OP_Goto);
}else if( ipkTop ){
ipkBottom = sqlite3VdbeAddOp0(v, OP_Goto);
sqlite3VdbeJumpHere(v, ipkTop-1);
}
@@ -1925,7 +2050,10 @@ void sqlite3GenerateConstraintChecks(
** This loop also handles the case of the PRIMARY KEY index for a
** WITHOUT ROWID table.
*/
for(ix=0, pIdx=pTab->pIndex; pIdx; pIdx=pIdx->pNext, ix++){
for(pIdx = indexIteratorFirst(&sIdxIter, &ix);
pIdx;
pIdx = indexIteratorNext(&sIdxIter, &ix)
){
int regIdx; /* Range of registers hold conent for pIdx */
int regR; /* Range of registers holding conflicting PK */
int iThisCur; /* Cursor for this UNIQUE index */
@@ -1933,15 +2061,14 @@ void sqlite3GenerateConstraintChecks(
int addrConflictCk; /* First opcode in the conflict check logic */
if( aRegIdx[ix]==0 ) continue; /* Skip indices that do not change */
if( pUpIdx==pIdx ){
addrUniqueOk = upsertJump+1;
upsertBypass = sqlite3VdbeGoto(v, 0);
VdbeComment((v, "Skip upsert subroutine"));
sqlite3VdbeJumpHere(v, upsertJump);
}else{
addrUniqueOk = sqlite3VdbeMakeLabel(pParse);
if( pUpsert ){
pUpsertClause = sqlite3UpsertOfIndex(pUpsert, pIdx);
if( upsertIpkDelay && pUpsertClause==pUpsert ){
sqlite3VdbeJumpHere(v, upsertIpkDelay);
}
}
if( bAffinityDone==0 && (pUpIdx==0 || pUpIdx==pIdx) ){
addrUniqueOk = sqlite3VdbeMakeLabel(pParse);
if( bAffinityDone==0 ){
sqlite3TableAffinity(v, pTab, regNewData+1);
bAffinityDone = 1;
}
@@ -2012,8 +2139,8 @@ void sqlite3GenerateConstraintChecks(
}
/* Figure out if the upsert clause applies to this index */
if( pUpIdx==pIdx ){
if( pUpsert->pUpsertSet==0 ){
if( pUpsertClause ){
if( pUpsertClause->isDoUpdate==0 ){
onError = OE_Ignore; /* DO NOTHING is the same as INSERT OR IGNORE */
}else{
onError = OE_Update; /* DO UPDATE */
@@ -2051,7 +2178,7 @@ void sqlite3GenerateConstraintChecks(
regIdx, pIdx->nKeyCol); VdbeCoverage(v);
/* Generate code to handle collisions */
regR = (pIdx==pPk) ? regIdx : sqlite3GetTempRange(pParse, nPkField);
regR = pIdx==pPk ? regIdx : sqlite3GetTempRange(pParse, nPkField);
if( isUpdate || onError==OE_Replace ){
if( HasRowid(pTab) ){
sqlite3VdbeAddOp2(v, OP_IdxRowid, iThisCur, regR);
@@ -2203,13 +2330,16 @@ void sqlite3GenerateConstraintChecks(
break;
}
}
if( pUpIdx==pIdx ){
sqlite3VdbeGoto(v, upsertJump+1);
sqlite3VdbeJumpHere(v, upsertBypass);
}else{
sqlite3VdbeResolveLabel(v, addrUniqueOk);
}
sqlite3VdbeResolveLabel(v, addrUniqueOk);
if( regR!=regIdx ) sqlite3ReleaseTempRange(pParse, regR, nPkField);
if( pUpsertClause
&& upsertIpkReturn
&& sqlite3UpsertNextIsIPK(pUpsertClause)
){
sqlite3VdbeGoto(v, upsertIpkDelay+1);
sqlite3VdbeJumpHere(v, upsertIpkReturn);
upsertIpkReturn = 0;
}
}
/* If the IPK constraint is a REPLACE, run it last */