1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Fix RBU so that it does not write rows that should be excluded into partial indexes (corrupting the database).

FossilOrigin-Name: 31eb27f438ad727b095a518bfe0f7ed37cb806fc1e6929b821eddcc6cc9de260
This commit is contained in:
dan
2019-04-11 16:54:20 +00:00
parent 6adba9031c
commit 971194aca3
4 changed files with 172 additions and 15 deletions

86
ext/rbu/rbupartial.test Normal file
View File

@ -0,0 +1,86 @@
# 2019 April 11
#
# The author disclaims copyright to this source code. In place of
# a legal notice, here is a blessing:
#
# May you do good and not evil.
# May you find forgiveness for yourself and forgive others.
# May you share freely, never taking more than you give.
#
#***********************************************************************
#
source [file join [file dirname [info script]] rbu_common.tcl]
set ::testprefix rbupartial
db close
sqlite3_shutdown
sqlite3_config_uri 1
foreach {tn without_rowid a b c d} {
1 "" a b c d
2 "WITHOUT ROWID" aaa bbb ccc ddd
3 "WITHOUT ROWID" "\"hello\"" {"one'two"} {[c]} ddd
4 "WITHOUT ROWID" {`a b`} {"one'two"} {[c c c]} ddd
5 "" a b c {"d""d"}
6 "" {'one''two'} b {"c""c"} {"d""d"}
} {
eval [string map [list \
%WITHOUT_ROWID% $without_rowid %A% $a %B% $b %C% $c %D% $d
] {
reset_db
do_execsql_test $tn.1.0 {
CREATE TABLE t1(%A% PRIMARY KEY, %B%, %C%, %D%) %WITHOUT_ROWID% ;
CREATE INDEX i1b ON t1(%B%);
CREATE INDEX i1b2 ON t1(%B%) WHERE %C%<5;
CREATE INDEX i1b3 ON t1(%B%) WHERE %C%>=5;
CREATE INDEX i1c ON t1(%C%);
CREATE INDEX i1c2 ON t1(%C%) WHERE %C% IS NULL;
CREATE INDEX i1c3 ON t1(%C%) WHERE %C% IS NOT NULL;
CREATE INDEX i1c4 ON t1(%C%) WHERE %D% < 'd';
}
do_execsql_test $tn.1.1 {
INSERT INTO t1 VALUES(0, NULL, NULL, 'a');
INSERT INTO t1 VALUES(1, 2, 3, 'b');
INSERT INTO t1 VALUES(4, 5, 6, 'c');
INSERT INTO t1 VALUES(7, 8, 9, 'd');
}
forcedelete rbu.db
do_test $tn.1.2 {
sqlite3 rbu rbu.db
rbu eval {
CREATE TABLE data_t1(%A%, %B%, %C%, %D%, rbu_control);
INSERT INTO data_t1 VALUES(10, 11, 12, 'e', 0);
INSERT INTO data_t1 VALUES(13, 14, NULL, 'f', 0);
INSERT INTO data_t1 VALUES(0, NULL, NULL, NULL, 1);
INSERT INTO data_t1 VALUES(4, NULL, NULL, NULL, 1);
INSERT INTO data_t1 VALUES(7, NULL, 4, NULL, '..x.');
INSERT INTO data_t1 VALUES(1, 10, NULL, NULL, '.xx.');
}
rbu close
} {}
do_test $tn.1.3 {
run_rbu test.db rbu.db
execsql { PRAGMA integrity_check }
} {ok}
do_execsql_test $tn.1.4 {
SELECT * FROM t1 ORDER BY %A%;
} {
1 10 {} b 7 8 4 d 10 11 12 e 13 14 {} f
}
set step 0
do_rbu_vacuum_test $tn.1.5 0
}]
}
finish_test

View File

@ -240,6 +240,11 @@ struct RbuUpdateStmt {
** it points to an array of flags nTblCol elements in size. The flag is
** set for each column that is either a part of the PK or a part of an
** index. Or clear otherwise.
**
** If there are one or more partial indexes on the table, all fields of
** this array set set to 1. This is because in that case, the module has
** no way to tell which fields will be required to add and remove entries
** from the partial indexes.
**
*/
struct RbuObjIter {
@ -1250,8 +1255,12 @@ static void rbuObjIterCacheIndexedCols(sqlite3rbu *p, RbuObjIter *pIter){
pIter->nIndex = 0;
while( p->rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pList) ){
const char *zIdx = (const char*)sqlite3_column_text(pList, 1);
int bPartial = sqlite3_column_int(pList, 4);
sqlite3_stmt *pXInfo = 0;
if( zIdx==0 ) break;
if( bPartial ){
memset(pIter->abIndexed, 0x01, sizeof(u8)*pIter->nTblCol);
}
p->rc = prepareFreeAndCollectError(p->dbMain, &pXInfo, &p->zErrmsg,
sqlite3_mprintf("PRAGMA main.index_xinfo = %Q", zIdx)
);
@ -1958,6 +1967,62 @@ static void rbuTmpInsertFunc(
}
}
static char *rbuObjIterGetIndexWhere(sqlite3rbu *p, RbuObjIter *pIter){
sqlite3_stmt *pStmt = 0;
int rc = p->rc;
char *zRet = 0;
if( rc==SQLITE_OK ){
rc = prepareAndCollectError(p->dbMain, &pStmt, &p->zErrmsg,
"SELECT trim(sql) FROM sqlite_master WHERE type='index' AND name=?"
);
}
if( rc==SQLITE_OK ){
int rc2;
rc = sqlite3_bind_text(pStmt, 1, pIter->zIdx, -1, SQLITE_STATIC);
if( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){
const char *zSql = (const char*)sqlite3_column_text(pStmt, 0);
if( zSql ){
int nParen = 0; /* Number of open parenthesis */
int i;
for(i=0; zSql[i]; i++){
char c = zSql[i];
if( c=='(' ){
nParen++;
}
else if( c==')' ){
nParen--;
if( nParen==0 ){
i++;
break;
}
}else if( c=='"' || c=='\'' || c=='`' ){
for(i++; 1; i++){
if( zSql[i]==c ){
if( zSql[i+1]!=c ) break;
i++;
}
}
}else if( c=='[' ){
for(i++; 1; i++){
if( zSql[i]==']' ) break;
}
}
}
if( zSql[i] ){
zRet = rbuStrndup(&zSql[i], &rc);
}
}
}
rc2 = sqlite3_finalize(pStmt);
if( rc==SQLITE_OK ) rc = rc2;
}
p->rc = rc;
return zRet;
}
/*
** Ensure that the SQLite statement handles required to update the
** target database object currently indicated by the iterator passed
@ -1987,6 +2052,7 @@ static int rbuObjIterPrepareAll(
char *zImposterPK = 0; /* Primary key declaration for imposter */
char *zWhere = 0; /* WHERE clause on PK columns */
char *zBind = 0;
char *zPart = 0;
int nBind = 0;
assert( pIter->eType!=RBU_PK_VTAB );
@ -1994,6 +2060,7 @@ static int rbuObjIterPrepareAll(
p, pIter, &zImposterCols, &zImposterPK, &zWhere, &nBind
);
zBind = rbuObjIterGetBindlist(p, nBind);
zPart = rbuObjIterGetIndexWhere(p, pIter);
/* Create the imposter table used to write to this index. */
sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->dbMain, "main", 0, 1);
@ -2026,28 +2093,30 @@ static int rbuObjIterPrepareAll(
char *zSql;
if( rbuIsVacuum(p) ){
zSql = sqlite3_mprintf(
"SELECT %s, 0 AS rbu_control FROM '%q' ORDER BY %s%s",
"SELECT %s, 0 AS rbu_control FROM '%q' %s ORDER BY %s%s",
zCollist,
pIter->zDataTbl,
zCollist, zLimit
zPart, zCollist, zLimit
);
}else
if( pIter->eType==RBU_PK_EXTERNAL || pIter->eType==RBU_PK_NONE ){
zSql = sqlite3_mprintf(
"SELECT %s, rbu_control FROM %s.'rbu_tmp_%q' ORDER BY %s%s",
"SELECT %s, rbu_control FROM %s.'rbu_tmp_%q' %s ORDER BY %s%s",
zCollist, p->zStateDb, pIter->zDataTbl,
zCollist, zLimit
zPart, zCollist, zLimit
);
}else{
zSql = sqlite3_mprintf(
"SELECT %s, rbu_control FROM %s.'rbu_tmp_%q' "
"SELECT %s, rbu_control FROM %s.'rbu_tmp_%q' %s "
"UNION ALL "
"SELECT %s, rbu_control FROM '%q' "
"WHERE typeof(rbu_control)='integer' AND rbu_control!=1 "
"%s %s typeof(rbu_control)='integer' AND rbu_control!=1 "
"ORDER BY %s%s",
zCollist, p->zStateDb, pIter->zDataTbl,
zCollist, p->zStateDb, pIter->zDataTbl, zPart,
zCollist, pIter->zDataTbl,
zPart,
(zPart ? "AND" : "WHERE"),
zCollist, zLimit
);
}
@ -2058,6 +2127,7 @@ static int rbuObjIterPrepareAll(
sqlite3_free(zImposterPK);
sqlite3_free(zWhere);
sqlite3_free(zBind);
sqlite3_free(zPart);
}else{
int bRbuRowid = (pIter->eType==RBU_PK_VTAB)
||(pIter->eType==RBU_PK_NONE)