1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Fix issues in [/info/1e227ad9f413227f|LIMIT/OFFSET support for virtual tables].

The first problem was reported by
[forum:/forumpost/c243b8f856|forum post c243b8f856].  That report prompted
an enhancement to the generate_series() (also included in this merge) which
in turn identified other similar issues.

FossilOrigin-Name: 5f6c079d847e3664ec5acaf1b3e989efe0d548c211ae4a18936162b36df89065
This commit is contained in:
drh
2024-04-26 19:10:15 +00:00
8 changed files with 328 additions and 45 deletions

View File

@ -376,13 +376,13 @@ static int seriesEof(sqlite3_vtab_cursor *cur){
** parameter. (idxStr is not used in this implementation.) idxNum
** is a bitmask showing which constraints are available:
**
** 1: start=VALUE
** 2: stop=VALUE
** 4: step=VALUE
**
** Also, if bit 8 is set, that means that the series should be output
** in descending order rather than in ascending order. If bit 16 is
** set, then output must appear in ascending order.
** 0x01: start=VALUE
** 0x02: stop=VALUE
** 0x04: step=VALUE
** 0x08: descending order
** 0x10: ascending order
** 0x20: LIMIT VALUE
** 0x40: OFFSET VALUE
**
** This routine should initialize the cursor and position it so that it
** is pointing at the first row, or pointing off the end of the table
@ -396,26 +396,44 @@ static int seriesFilter(
series_cursor *pCur = (series_cursor *)pVtabCursor;
int i = 0;
(void)idxStrUnused;
if( idxNum & 1 ){
if( idxNum & 0x01 ){
pCur->ss.iBase = sqlite3_value_int64(argv[i++]);
}else{
pCur->ss.iBase = 0;
}
if( idxNum & 2 ){
if( idxNum & 0x02 ){
pCur->ss.iTerm = sqlite3_value_int64(argv[i++]);
}else{
pCur->ss.iTerm = 0xffffffff;
}
if( idxNum & 4 ){
if( idxNum & 0x04 ){
pCur->ss.iStep = sqlite3_value_int64(argv[i++]);
if( pCur->ss.iStep==0 ){
pCur->ss.iStep = 1;
}else if( pCur->ss.iStep<0 ){
if( (idxNum & 16)==0 ) idxNum |= 8;
if( (idxNum & 0x10)==0 ) idxNum |= 0x08;
}
}else{
pCur->ss.iStep = 1;
}
if( idxNum & 0x20 ){
sqlite3_int64 iLimit = sqlite3_value_int64(argv[i++]);
sqlite3_int64 iTerm;
if( idxNum & 0x40 ){
sqlite3_int64 iOffset = sqlite3_value_int64(argv[i++]);
if( iOffset>0 ){
pCur->ss.iBase += pCur->ss.iStep*iOffset;
}
}
if( iLimit>=0 ){
iTerm = pCur->ss.iBase + (iLimit - 1)*pCur->ss.iStep;
if( pCur->ss.iStep<0 ){
if( iTerm>pCur->ss.iTerm ) pCur->ss.iTerm = iTerm;
}else{
if( iTerm<pCur->ss.iTerm ) pCur->ss.iTerm = iTerm;
}
}
}
for(i=0; i<argc; i++){
if( sqlite3_value_type(argv[i])==SQLITE_NULL ){
/* If any of the constraints have a NULL value, then return no rows.
@ -426,7 +444,7 @@ static int seriesFilter(
break;
}
}
if( idxNum & 8 ){
if( idxNum & 0x08 ){
pCur->ss.isReversing = pCur->ss.iStep > 0;
}else{
pCur->ss.isReversing = pCur->ss.iStep < 0;
@ -446,10 +464,13 @@ static int seriesFilter(
**
** The query plan is represented by bits in idxNum:
**
** (1) start = $value -- constraint exists
** (2) stop = $value -- constraint exists
** (4) step = $value -- constraint exists
** (8) output in descending order
** 0x01 start = $value -- constraint exists
** 0x02 stop = $value -- constraint exists
** 0x04 step = $value -- constraint exists
** 0x08 output is in descending order
** 0x10 output is in ascending order
** 0x20 LIMIT $value -- constraint exists
** 0x40 OFFSET $value -- constraint exists
*/
static int seriesBestIndex(
sqlite3_vtab *pVTab,
@ -457,10 +478,12 @@ static int seriesBestIndex(
){
int i, j; /* Loop over constraints */
int idxNum = 0; /* The query plan bitmask */
#ifndef ZERO_ARGUMENT_GENERATE_SERIES
int bStartSeen = 0; /* EQ constraint seen on the START column */
#endif
int unusableMask = 0; /* Mask of unusable constraints */
int nArg = 0; /* Number of arguments that seriesFilter() expects */
int aIdx[3]; /* Constraints on start, stop, and step */
int aIdx[5]; /* Constraints on start, stop, step, LIMIT, OFFSET */
const struct sqlite3_index_constraint *pConstraint;
/* This implementation assumes that the start, stop, and step columns
@ -468,28 +491,54 @@ static int seriesBestIndex(
assert( SERIES_COLUMN_STOP == SERIES_COLUMN_START+1 );
assert( SERIES_COLUMN_STEP == SERIES_COLUMN_START+2 );
aIdx[0] = aIdx[1] = aIdx[2] = -1;
aIdx[0] = aIdx[1] = aIdx[2] = aIdx[3] = aIdx[4] = -1;
pConstraint = pIdxInfo->aConstraint;
for(i=0; i<pIdxInfo->nConstraint; i++, pConstraint++){
int iCol; /* 0 for start, 1 for stop, 2 for step */
int iMask; /* bitmask for those column */
int op = pConstraint->op;
if( op>=SQLITE_INDEX_CONSTRAINT_LIMIT
&& op<=SQLITE_INDEX_CONSTRAINT_OFFSET
){
if( pConstraint->usable==0 ){
/* do nothing */
}else if( op==SQLITE_INDEX_CONSTRAINT_LIMIT ){
aIdx[3] = i;
idxNum |= 0x20;
}else{
assert( op==SQLITE_INDEX_CONSTRAINT_OFFSET );
aIdx[4] = i;
idxNum |= 0x40;
}
continue;
}
if( pConstraint->iColumn<SERIES_COLUMN_START ) continue;
iCol = pConstraint->iColumn - SERIES_COLUMN_START;
assert( iCol>=0 && iCol<=2 );
iMask = 1 << iCol;
if( iCol==0 ) bStartSeen = 1;
#ifndef ZERO_ARGUMENT_GENERATE_SERIES
if( iCol==0 && op==SQLITE_INDEX_CONSTRAINT_EQ ){
bStartSeen = 1;
}
#endif
if( pConstraint->usable==0 ){
unusableMask |= iMask;
continue;
}else if( pConstraint->op==SQLITE_INDEX_CONSTRAINT_EQ ){
}else if( op==SQLITE_INDEX_CONSTRAINT_EQ ){
idxNum |= iMask;
aIdx[iCol] = i;
}
}
for(i=0; i<3; i++){
if( aIdx[3]==0 ){
/* Ignore OFFSET if LIMIT is omitted */
idxNum &= ~0x60;
aIdx[4] = 0;
}
for(i=0; i<5; i++){
if( (j = aIdx[i])>=0 ){
pIdxInfo->aConstraintUsage[j].argvIndex = ++nArg;
pIdxInfo->aConstraintUsage[j].omit = !SQLITE_SERIES_CONSTRAINT_VERIFY;
pIdxInfo->aConstraintUsage[j].omit =
!SQLITE_SERIES_CONSTRAINT_VERIFY || i>=3;
}
}
/* The current generate_column() implementation requires at least one
@ -510,19 +559,22 @@ static int seriesBestIndex(
** this plan is unusable */
return SQLITE_CONSTRAINT;
}
if( (idxNum & 3)==3 ){
if( (idxNum & 0x03)==0x03 ){
/* Both start= and stop= boundaries are available. This is the
** the preferred case */
pIdxInfo->estimatedCost = (double)(2 - ((idxNum&4)!=0));
pIdxInfo->estimatedRows = 1000;
if( pIdxInfo->nOrderBy>=1 && pIdxInfo->aOrderBy[0].iColumn==0 ){
if( pIdxInfo->aOrderBy[0].desc ){
idxNum |= 8;
idxNum |= 0x08;
}else{
idxNum |= 16;
idxNum |= 0x10;
}
pIdxInfo->orderByConsumed = 1;
}
}else if( (idxNum & 0x21)==0x21 ){
/* We have start= and LIMIT */
pIdxInfo->estimatedRows = 2500;
}else{
/* If either boundary is missing, we have to generate a huge span
** of numbers. Make this case very expensive so that the query