1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-11-11 01:42:22 +03:00

Enhance the code generator for INSERT INTO ... SELECT so that the SELECT

generates output directly in the registers that INSERT INTO will be using,
in many cases, and OP_SCopy operations can thus be avoided.

FossilOrigin-Name: aa2d8b0e8154dd2f5e2c837dc11ab362b083495b
This commit is contained in:
drh
2014-02-16 01:55:49 +00:00
parent cfc6ca4179
commit 05a86c5c0f
9 changed files with 165 additions and 199 deletions

View File

@@ -148,7 +148,7 @@ void sqlite3TableAffinityStr(Vdbe *v, Table *pTab){
** a statement of the form "INSERT INTO <iDb, pTab> SELECT ..." can
** run without using temporary table for the results of the SELECT.
*/
static int readsTable(Parse *p, int iStartAddr, int iDb, Table *pTab){
static int readsTable(Parse *p, int iDb, Table *pTab){
Vdbe *v = sqlite3GetVdbe(p);
int i;
int iEnd = sqlite3VdbeCurrentAddr(v);
@@ -156,7 +156,7 @@ static int readsTable(Parse *p, int iStartAddr, int iDb, Table *pTab){
VTable *pVTab = IsVirtual(pTab) ? sqlite3GetVTable(p->db, pTab) : 0;
#endif
for(i=iStartAddr; i<iEnd; i++){
for(i=1; i<iEnd; i++){
VdbeOp *pOp = sqlite3VdbeGetOp(v, i);
assert( pOp!=0 );
if( pOp->opcode==OP_OpenRead && pOp->p3==iDb ){
@@ -335,79 +335,6 @@ void sqlite3AutoincrementEnd(Parse *pParse){
#endif /* SQLITE_OMIT_AUTOINCREMENT */
/*
** Generate code for a co-routine that will evaluate a subquery one
** row at a time.
**
** The pSelect parameter is the subquery that the co-routine will evaluation.
** Information about the location of co-routine and the registers it will use
** is returned by filling in the pDest object.
**
** Registers are allocated as follows:
**
** pDest->iSDParm The register holding the next entry-point of the
** co-routine. Run the co-routine to its next breakpoint
** by calling "OP_Yield $X" where $X is pDest->iSDParm.
**
** pDest->iSdst First result register.
**
** pDest->nSdst Number of result registers.
**
** At EOF the first result register will be marked as "undefined" so that
** the caller can know when to stop reading results.
**
** This routine handles all of the register allocation and fills in the
** pDest structure appropriately.
**
** Here is a schematic of the generated code assuming that X is the
** co-routine entry-point register reg[pDest->iSDParm], that EOF is the
** completed flag reg[pDest->iSDParm+1], and R and S are the range of
** registers that hold the result set, reg[pDest->iSdst] through
** reg[pDest->iSdst+pDest->nSdst-1]:
**
** X <- A
** goto B
** A: setup for the SELECT
** loop rows in the SELECT
** load results into registers R..S
** yield X
** end loop
** cleanup after the SELECT
** end co-routine R
** B:
**
** To use this subroutine, the caller generates code as follows:
**
** [ Co-routine generated by this subroutine, shown above ]
** S: yield X, at EOF goto E
** if skip this row, goto C
** if terminate loop, goto E
** deal with this row
** C: goto S
** E:
*/
int sqlite3CodeCoroutine(Parse *pParse, Select *pSelect, SelectDest *pDest){
int regYield; /* Register holding co-routine entry-point */
int addrTop; /* Top of the co-routine */
int rc; /* Result code */
Vdbe *v; /* VDBE under construction */
regYield = ++pParse->nMem;
v = sqlite3GetVdbe(pParse);
addrTop = sqlite3VdbeCurrentAddr(v) + 1;
sqlite3VdbeAddOp3(v, OP_InitCoroutine, regYield, 0, addrTop);
sqlite3SelectDestInit(pDest, SRT_Coroutine, regYield);
rc = sqlite3Select(pParse, pSelect, pDest);
assert( pParse->nErr==0 || rc );
if( pParse->db->mallocFailed && rc==SQLITE_OK ) rc = SQLITE_NOMEM;
if( rc ) return rc;
sqlite3VdbeAddOp1(v, OP_EndCoroutine, regYield);
sqlite3VdbeJumpHere(v, addrTop - 1); /* label B: */
return rc;
}
/* Forward declaration */
static int xferOptimization(
Parse *pParse, /* Parser context */
@@ -531,16 +458,16 @@ void sqlite3Insert(
int iIdxCur = 0; /* First index cursor */
int ipkColumn = -1; /* Column that is the INTEGER PRIMARY KEY */
int endOfLoop; /* Label for the end of the insertion loop */
int useTempTable = 0; /* Store SELECT results in intermediate table */
int srcTab = 0; /* Data comes from this temporary cursor if >=0 */
int addrInsTop = 0; /* Jump to label "D" */
int addrCont = 0; /* Top of insert loop. Label "C" in templates 3 and 4 */
int addrSelect = 0; /* Address of coroutine that implements the SELECT */
SelectDest dest; /* Destination for SELECT on rhs of INSERT */
int iDb; /* Index of database holding TABLE */
Db *pDb; /* The database containing table being inserted into */
int appendFlag = 0; /* True if the insert is likely to be an append */
int withoutRowid; /* 0 for normal table. 1 for WITHOUT ROWID table */
u8 useTempTable = 0; /* Store SELECT results in intermediate table */
u8 appendFlag = 0; /* True if the insert is likely to be an append */
u8 withoutRowid; /* 0 for normal table. 1 for WITHOUT ROWID table */
u8 bIdListInOrder = 1; /* True if IDLIST is in table order */
ExprList *pList = 0; /* List of VALUES() to be inserted */
/* Register allocations */
@@ -652,6 +579,56 @@ void sqlite3Insert(
*/
regAutoinc = autoIncBegin(pParse, iDb, pTab);
/* Allocate registers for holding the rowid of the new row,
** the content of the new row, and the assemblied row record.
*/
regRowid = regIns = pParse->nMem+1;
pParse->nMem += pTab->nCol + 1;
if( IsVirtual(pTab) ){
regRowid++;
pParse->nMem++;
}
regData = regRowid+1;
/* If the INSERT statement included an IDLIST term, then make sure
** all elements of the IDLIST really are columns of the table and
** remember the column indices.
**
** If the table has an INTEGER PRIMARY KEY column and that column
** is named in the IDLIST, then record in the ipkColumn variable
** the index into IDLIST of the primary key column. ipkColumn is
** the index of the primary key as it appears in IDLIST, not as
** is appears in the original table. (The index of the INTEGER
** PRIMARY KEY in the original table is pTab->iPKey.)
*/
if( pColumn ){
for(i=0; i<pColumn->nId; i++){
pColumn->a[i].idx = -1;
}
for(i=0; i<pColumn->nId; i++){
for(j=0; j<pTab->nCol; j++){
if( sqlite3StrICmp(pColumn->a[i].zName, pTab->aCol[j].zName)==0 ){
pColumn->a[i].idx = j;
if( i!=j ) bIdListInOrder = 0;
if( j==pTab->iPKey ){
ipkColumn = i; assert( !withoutRowid );
}
break;
}
}
if( j>=pTab->nCol ){
if( sqlite3IsRowid(pColumn->a[i].zName) && !withoutRowid ){
ipkColumn = i;
}else{
sqlite3ErrorMsg(pParse, "table %S has no column named %s",
pTabList, 0, pColumn->a[i].zName);
pParse->checkSchema = 1;
goto insert_cleanup;
}
}
}
}
/* Figure out how many columns of data are supplied. If the data
** is coming from a SELECT statement, then generate a co-routine that
** produces a single row of the SELECT on each invocation. The
@@ -659,13 +636,24 @@ void sqlite3Insert(
*/
if( pSelect ){
/* Data is coming from a SELECT. Generate a co-routine to run the SELECT */
int rc = sqlite3CodeCoroutine(pParse, pSelect, &dest);
if( rc ) goto insert_cleanup;
int regYield; /* Register holding co-routine entry-point */
int addrTop; /* Top of the co-routine */
int rc; /* Result code */
regYield = ++pParse->nMem;
addrTop = sqlite3VdbeCurrentAddr(v) + 1;
sqlite3VdbeAddOp3(v, OP_InitCoroutine, regYield, 0, addrTop);
sqlite3SelectDestInit(&dest, SRT_Coroutine, regYield);
dest.iSdst = bIdListInOrder ? regData : 0;
dest.nSdst = pTab->nCol;
rc = sqlite3Select(pParse, pSelect, &dest);
regFromSelect = dest.iSdst;
assert( pParse->nErr==0 || rc );
if( rc || db->mallocFailed ) goto insert_cleanup;
sqlite3VdbeAddOp1(v, OP_EndCoroutine, regYield);
sqlite3VdbeJumpHere(v, addrTop - 1); /* label B: */
assert( pSelect->pEList );
nColumn = pSelect->pEList->nExpr;
assert( dest.nSdst==nColumn );
/* Set useTempTable to TRUE if the result of the SELECT statement
** should be written into a temporary table (template 4). Set to
@@ -676,7 +664,7 @@ void sqlite3Insert(
** of the tables being read by the SELECT statement. Also use a
** temp table in the case of row triggers.
*/
if( pTrigger || readsTable(pParse, addrSelect, iDb, pTab) ){
if( pTrigger || readsTable(pParse, iDb, pTab) ){
useTempTable = 1;
}
@@ -725,6 +713,14 @@ void sqlite3Insert(
}
}
/* If there is no IDLIST term but the table has an integer primary
** key, the set the ipkColumn variable to the integer primary key
** column index in the original table definition.
*/
if( pColumn==0 && nColumn>0 ){
ipkColumn = pTab->iPKey;
}
/* Make sure the number of columns in the source data matches the number
** of columns to be inserted into the table.
*/
@@ -743,52 +739,6 @@ void sqlite3Insert(
sqlite3ErrorMsg(pParse, "%d values for %d columns", nColumn, pColumn->nId);
goto insert_cleanup;
}
/* If the INSERT statement included an IDLIST term, then make sure
** all elements of the IDLIST really are columns of the table and
** remember the column indices.
**
** If the table has an INTEGER PRIMARY KEY column and that column
** is named in the IDLIST, then record in the ipkColumn variable
** the index into IDLIST of the primary key column. ipkColumn is
** the index of the primary key as it appears in IDLIST, not as
** is appears in the original table. (The index of the INTEGER
** PRIMARY KEY in the original table is pTab->iPKey.)
*/
if( pColumn ){
for(i=0; i<pColumn->nId; i++){
pColumn->a[i].idx = -1;
}
for(i=0; i<pColumn->nId; i++){
for(j=0; j<pTab->nCol; j++){
if( sqlite3StrICmp(pColumn->a[i].zName, pTab->aCol[j].zName)==0 ){
pColumn->a[i].idx = j;
if( j==pTab->iPKey ){
ipkColumn = i; assert( !withoutRowid );
}
break;
}
}
if( j>=pTab->nCol ){
if( sqlite3IsRowid(pColumn->a[i].zName) && !withoutRowid ){
ipkColumn = i;
}else{
sqlite3ErrorMsg(pParse, "table %S has no column named %s",
pTabList, 0, pColumn->a[i].zName);
pParse->checkSchema = 1;
goto insert_cleanup;
}
}
}
}
/* If there is no IDLIST term but the table has an integer primary
** key, the set the ipkColumn variable to the integer primary key
** column index in the original table definition.
*/
if( pColumn==0 && nColumn>0 ){
ipkColumn = pTab->iPKey;
}
/* Initialize the count of rows to be inserted
*/
@@ -836,17 +786,6 @@ void sqlite3Insert(
addrInsTop = addrCont = sqlite3VdbeAddOp1(v, OP_Yield, dest.iSDParm);
}
/* Allocate registers for holding the rowid of the new row,
** the content of the new row, and the assemblied row record.
*/
regRowid = regIns = pParse->nMem+1;
pParse->nMem += pTab->nCol + 1;
if( IsVirtual(pTab) ){
regRowid++;
pParse->nMem++;
}
regData = regRowid+1;
/* Run the BEFORE and INSTEAD OF triggers, if there are any
*/
endOfLoop = sqlite3VdbeMakeLabel(v);
@@ -930,7 +869,7 @@ void sqlite3Insert(
if( useTempTable ){
sqlite3VdbeAddOp3(v, OP_Column, srcTab, ipkColumn, regRowid);
}else if( pSelect ){
sqlite3VdbeAddOp2(v, OP_SCopy, regFromSelect+ipkColumn, regRowid);
sqlite3VdbeAddOp2(v, OP_Copy, regFromSelect+ipkColumn, regRowid);
}else{
VdbeOp *pOp;
sqlite3ExprCode(pParse, pList->a[ipkColumn].pExpr, regRowid);
@@ -976,8 +915,9 @@ void sqlite3Insert(
/* The value of the INTEGER PRIMARY KEY column is always a NULL.
** Whenever this column is read, the rowid will be substituted
** in its place. Hence, fill this column with a NULL to avoid
** taking up data space with information that will never be used. */
sqlite3VdbeAddOp2(v, OP_Null, 0, iRegStore);
** taking up data space with information that will never be used.
** As there may be shallow copies of this value, make it a soft-NULL */
sqlite3VdbeAddOp1(v, OP_SoftNull, iRegStore);
continue;
}
if( pColumn==0 ){
@@ -994,11 +934,13 @@ void sqlite3Insert(
}
}
if( j<0 || nColumn==0 || (pColumn && j>=pColumn->nId) ){
sqlite3ExprCode(pParse, pTab->aCol[i].pDflt, iRegStore);
sqlite3ExprCodeFactorable(pParse, pTab->aCol[i].pDflt, iRegStore);
}else if( useTempTable ){
sqlite3VdbeAddOp3(v, OP_Column, srcTab, j, iRegStore);
}else if( pSelect ){
sqlite3VdbeAddOp2(v, OP_SCopy, regFromSelect+j, iRegStore);
if( regFromSelect!=regData ){
sqlite3VdbeAddOp2(v, OP_SCopy, regFromSelect+j, iRegStore);
}
}else{
sqlite3ExprCode(pParse, pList->a[j].pExpr, iRegStore);
}