1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-07-29 08:01:23 +03:00

Experimental multi-threaded sorting changes to allow the sorter to begin returning items to the VDBE before all data is sorted.

FossilOrigin-Name: f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f
This commit is contained in:
dan
2014-04-09 20:04:17 +00:00
parent bf20a35d5f
commit d30ab3d9dd
4 changed files with 348 additions and 122 deletions

View File

@ -1,5 +1,5 @@
C Fix\sharmless\scompiler\swarnings. C Experimental\smulti-threaded\ssorting\schanges\sto\sallow\sthe\ssorter\sto\sbegin\sreturning\sitems\sto\sthe\sVDBE\sbefore\sall\sdata\sis\ssorted.
D 2014-04-04T22:44:59.018 D 2014-04-09T20:04:17.324
F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea
F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
F src/vdbesort.c 8da916fc74e78edd5bc95653206942e01710ac09 F src/vdbesort.c 26823b626c3231a52e45f5e78a18cb8681bb1b88
F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@ -818,7 +818,7 @@ F test/skipscan2.test 5a4db0799c338ddbacb154aaa5589c0254b36a8d
F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f
F test/softheap1.test 40562fe6cac6d9827b7b42b86d45aedf12c15e24 F test/softheap1.test 40562fe6cac6d9827b7b42b86d45aedf12c15e24
F test/sort.test 79dc647c4e9b123a64e57b7080b7f9a2df43f87a F test/sort.test 79dc647c4e9b123a64e57b7080b7f9a2df43f87a
F test/sort2.test 21cd865e31adecdc8fc81c8d95431e629676a8d8 F test/sort2.test bbc2eb244fb862141a900a851056d48705b5997b
F test/sort3.test c3f88d233452a129de519de311d109a0ad0da0af F test/sort3.test c3f88d233452a129de519de311d109a0ad0da0af
F test/speed1.test f2974a91d79f58507ada01864c0e323093065452 F test/speed1.test f2974a91d79f58507ada01864c0e323093065452
F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb
@ -1163,7 +1163,10 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
P 5e3dfa27c71a666e122e3cf64897038ff8424800 P e54dded2012f0ab486ee138e9bd57c528af33980
R bbcde0d30a2bb025a3c15f4a10b2b404 R 803b4ddf4cddf7e21aeddc04109caaf0
U drh T *branch * threads-experimental
Z 9376e61a8443d421f4f6f69d3d5500ac T *sym-threads-experimental *
T -sym-threads *
U dan
Z 3b5c615396ccbaaa23add5a8103bd906

View File

@ -1 +1 @@
e54dded2012f0ab486ee138e9bd57c528af33980 f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f

View File

@ -96,6 +96,8 @@ typedef struct PmaReader PmaReader; /* Incrementally read one PMA */
typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */
typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SorterRecord SorterRecord; /* A record being sorted */
typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
typedef struct SorterFile SorterFile;
typedef struct IncrMerger IncrMerger;
/* /*
@ -105,6 +107,11 @@ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ #define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */
#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ #define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */
struct SorterFile {
sqlite3_file *pFd;
i64 iEof;
};
/* /*
** Sorting is divided up into smaller subtasks. Each subtask is controlled ** Sorting is divided up into smaller subtasks. Each subtask is controlled
** by an instance of this object. A Subtask might run in either the main thread ** by an instance of this object. A Subtask might run in either the main thread
@ -145,6 +152,7 @@ struct SortSubtask {
int bDone; /* Set to true by pTask when finished */ int bDone; /* Set to true by pTask when finished */
sqlite3 *db; /* Database connection */ sqlite3 *db; /* Database connection */
VdbeSorter *pSorter; /* Sorter */
KeyInfo *pKeyInfo; /* How to compare records */ KeyInfo *pKeyInfo; /* How to compare records */
UnpackedRecord *pUnpacked; /* Space to unpack a record */ UnpackedRecord *pUnpacked; /* Space to unpack a record */
int pgsz; /* Main database page size */ int pgsz; /* Main database page size */
@ -155,9 +163,8 @@ struct SortSubtask {
int nInMemory; /* Expected size of PMA based on pList */ int nInMemory; /* Expected size of PMA based on pList */
u8 *aListMemory; /* Records memory (or NULL) */ u8 *aListMemory; /* Records memory (or NULL) */
int nPMA; /* Number of PMAs currently in pTemp1 */ int nPMA; /* Number of PMAs currently in file */
i64 iTemp1Off; /* Offset to write to in pTemp1 */ SorterFile file;
sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */
}; };
@ -239,8 +246,10 @@ struct VdbeSorter {
int mnPmaSize; /* Minimum PMA size, in bytes */ int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
int bUsePMA; /* True if one or more PMAs created */ int bUsePMA; /* True if one or more PMAs created */
int bUseThreads; /* True if one or more PMAs created */
SorterRecord *pRecord; /* Head of in-memory record list */ SorterRecord *pRecord; /* Head of in-memory record list */
MergeEngine *pMerger; /* For final merge of PMAs (by caller) */ PmaReader *pReader; /* Read data from here after Rewind() */
UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
u8 *aMemory; /* Block of memory to alloc records from */ u8 *aMemory; /* Block of memory to alloc records from */
int iMemory; /* Offset of first free byte in aMemory */ int iMemory; /* Offset of first free byte in aMemory */
int nMemory; /* Size of aMemory allocation in bytes */ int nMemory; /* Size of aMemory allocation in bytes */
@ -265,6 +274,16 @@ struct PmaReader {
u8 *aBuffer; /* Current read buffer */ u8 *aBuffer; /* Current read buffer */
int nBuffer; /* Size of read buffer in bytes */ int nBuffer; /* Size of read buffer in bytes */
u8 *aMap; /* Pointer to mapping of entire file */ u8 *aMap; /* Pointer to mapping of entire file */
IncrMerger *pIncr; /* Incremental merger */
};
struct IncrMerger {
int mxSz; /* Maximum size of files */
SortSubtask *pTask; /* Task that owns this merger */
int bEof; /* Set to true when merge is finished */
SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
MergeEngine *pMerger; /* Merge engine thread reads data from */
SQLiteThread *pThread; /* Thread currently populating aFile[1] */
}; };
/* /*
@ -326,6 +345,9 @@ struct SorterRecord {
/* Maximum number of PMAs that a single MergeEngine can merge */ /* Maximum number of PMAs that a single MergeEngine can merge */
#define SORTER_MAX_MERGE_COUNT 16 #define SORTER_MAX_MERGE_COUNT 16
static int vdbeIncrSwap(IncrMerger*);
static void vdbeIncrFree(IncrMerger*);
/* /*
** Free all memory belonging to the PmaReader object passed as the second ** Free all memory belonging to the PmaReader object passed as the second
** argument. All structure fields are set to zero before returning. ** argument. All structure fields are set to zero before returning.
@ -334,6 +356,7 @@ static void vdbePmaReaderClear(PmaReader *pIter){
sqlite3_free(pIter->aAlloc); sqlite3_free(pIter->aAlloc);
sqlite3_free(pIter->aBuffer); sqlite3_free(pIter->aBuffer);
if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
if( pIter->pIncr ) vdbeIncrFree(pIter->pIncr);
memset(pIter, 0, sizeof(PmaReader)); memset(pIter, 0, sizeof(PmaReader));
} }
@ -400,7 +423,7 @@ static int vdbePmaReadBlob(
/* Extend the p->aAlloc[] allocation if required. */ /* Extend the p->aAlloc[] allocation if required. */
if( p->nAlloc<nByte ){ if( p->nAlloc<nByte ){
u8 *aNew; u8 *aNew;
int nNew = p->nAlloc*2; int nNew = MAX(128, p->nAlloc*2);
while( nByte>nNew ) nNew = nNew*2; while( nByte>nNew ) nNew = nNew*2;
aNew = sqlite3Realloc(p->aAlloc, nNew); aNew = sqlite3Realloc(p->aAlloc, nNew);
if( !aNew ) return SQLITE_NOMEM; if( !aNew ) return SQLITE_NOMEM;
@ -464,22 +487,70 @@ static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
return SQLITE_OK; return SQLITE_OK;
} }
static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
int rc = SQLITE_OK;
if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
}
return rc;
}
static int vdbePmaReaderReinit(PmaReader *pIter){
IncrMerger *pIncr = pIter->pIncr;
SortSubtask *pTask = pIncr->pTask;
int rc = SQLITE_OK;
assert( pIncr->bEof==0 );
if( pIter->aMap ){
sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
pIter->aMap = 0;
}
pIter->iReadOff = 0;
pIter->iEof = pIncr->aFile[0].iEof;
pIter->pFile = pIncr->aFile[0].pFd;
rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
if( rc==SQLITE_OK ){
if( pIter->aMap==0 && pIter->aBuffer==0 ){
pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
pIter->nBuffer = pTask->pgsz;
}
}
return rc;
}
/* /*
** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
** no error occurs, or an SQLite error code if one does. ** no error occurs, or an SQLite error code if one does.
*/ */
static int vdbePmaReaderNext(PmaReader *pIter){ static int vdbePmaReaderNext(PmaReader *pIter){
int rc; /* Return Code */ int rc = SQLITE_OK; /* Return Code */
u64 nRec = 0; /* Size of record in bytes */ u64 nRec = 0; /* Size of record in bytes */
if( pIter->iReadOff>=pIter->iEof ){ if( pIter->iReadOff>=pIter->iEof ){
/* This is an EOF condition */ int bEof = 1;
vdbePmaReaderClear(pIter); if( pIter->pIncr ){
return SQLITE_OK; rc = vdbeIncrSwap(pIter->pIncr);
if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
rc = vdbePmaReaderReinit(pIter);
bEof = 0;
}
}
if( bEof ){
/* This is an EOF condition */
vdbePmaReaderClear(pIter);
return rc;
}
} }
rc = vdbePmaReadVarint(pIter, &nRec); if( rc==SQLITE_OK ){
rc = vdbePmaReadVarint(pIter, &nRec);
}
if( rc==SQLITE_OK ){ if( rc==SQLITE_OK ){
pIter->nKey = (int)nRec; pIter->nKey = (int)nRec;
rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey); rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey);
@ -493,10 +564,14 @@ static int vdbePmaReaderNext(PmaReader *pIter){
** starting at offset iStart and ending at offset iEof-1. This function ** starting at offset iStart and ending at offset iEof-1. This function
** leaves the iterator pointing to the first key in the PMA (or EOF if the ** leaves the iterator pointing to the first key in the PMA (or EOF if the
** PMA is empty). ** PMA is empty).
**
** If the pnByte parameter is NULL, then it is assumed that the file
** contains a single PMA, and that that PMA omits the initial length varint.
*/ */
static int vdbePmaReaderInit( static int vdbePmaReaderInit(
SortSubtask *pTask, /* Thread context */ SortSubtask *pTask, /* Task context */
i64 iStart, /* Start offset in pTask->pTemp1 */ SorterFile *pFile, /* Sorter file to read from */
i64 iStart, /* Start offset in pFile */
PmaReader *pIter, /* Iterator to populate */ PmaReader *pIter, /* Iterator to populate */
i64 *pnByte /* IN/OUT: Increment this value by PMA size */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */
){ ){
@ -504,18 +579,18 @@ static int vdbePmaReaderInit(
int nBuf = pTask->pgsz; int nBuf = pTask->pgsz;
void *pMap = 0; /* Mapping of temp file */ void *pMap = 0; /* Mapping of temp file */
assert( pTask->iTemp1Off>iStart ); assert( pFile->iEof>iStart );
assert( pIter->aAlloc==0 ); assert( pIter->aAlloc==0 );
assert( pIter->aBuffer==0 ); assert( pIter->aBuffer==0 );
pIter->pFile = pTask->pTemp1; pIter->pFile = pFile->pFd;
pIter->iReadOff = iStart; pIter->iReadOff = iStart;
pIter->nAlloc = 128; pIter->nAlloc = 128;
pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
if( pIter->aAlloc ){ if( pIter->aAlloc ){
/* Try to xFetch() a mapping of the entire temp file. If this is possible, /* Try to xFetch() a mapping of the entire temp file. If this is possible,
** the PMA will be read via the mapping. Otherwise, use xRead(). */ ** the PMA will be read via the mapping. Otherwise, use xRead(). */
if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){ if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap); rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
} }
}else{ }else{
rc = SQLITE_NOMEM; rc = SQLITE_NOMEM;
@ -533,12 +608,12 @@ static int vdbePmaReaderInit(
int iBuf = iStart % nBuf; int iBuf = iStart % nBuf;
if( iBuf ){ if( iBuf ){
int nRead = nBuf - iBuf; int nRead = nBuf - iBuf;
if( (iStart + nRead) > pTask->iTemp1Off ){ if( (iStart + nRead) > pFile->iEof ){
nRead = (int)(pTask->iTemp1Off - iStart); nRead = (int)(pFile->iEof - iStart);
} }
rc = sqlite3OsRead( rc = sqlite3OsRead(
pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
); );
assert( rc!=SQLITE_IOERR_SHORT_READ ); assert( rc!=SQLITE_IOERR_SHORT_READ );
} }
} }
@ -547,7 +622,7 @@ static int vdbePmaReaderInit(
if( rc==SQLITE_OK ){ if( rc==SQLITE_OK ){
u64 nByte; /* Size of PMA in bytes */ u64 nByte; /* Size of PMA in bytes */
pIter->iEof = pTask->iTemp1Off; pIter->iEof = pFile->iEof;
rc = vdbePmaReadVarint(pIter, &nByte); rc = vdbePmaReadVarint(pIter, &nByte);
pIter->iEof = pIter->iReadOff + nByte; pIter->iEof = pIter->iReadOff + nByte;
*pnByte += nByte; *pnByte += nByte;
@ -669,11 +744,13 @@ int sqlite3VdbeSorterInit(
pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
pSorter->nTask = nWorker + 1; pSorter->nTask = nWorker + 1;
pSorter->bUseThreads = (pSorter->nTask>1);
for(i=0; i<pSorter->nTask; i++){ for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i]; SortSubtask *pTask = &pSorter->aTask[i];
pTask->pKeyInfo = pKeyInfo; pTask->pKeyInfo = pKeyInfo;
pTask->pgsz = pgsz; pTask->pgsz = pgsz;
pTask->db = db; pTask->db = db;
pTask->pSorter = pSorter;
} }
if( !sqlite3TempInMemory(db) ){ if( !sqlite3TempInMemory(db) ){
@ -723,9 +800,10 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
pTask->aListMemory = 0; pTask->aListMemory = 0;
} }
pTask->pList = 0; pTask->pList = 0;
if( pTask->pTemp1 ){ if( pTask->file.pFd ){
sqlite3OsCloseFree(pTask->pTemp1); sqlite3OsCloseFree(pTask->file.pFd);
pTask->pTemp1 = 0; pTask->file.pFd = 0;
pTask->file.iEof = 0;
} }
} }
@ -761,7 +839,8 @@ static MergeEngine *vdbeMergeEngineNew(int nIter){
int nByte; /* Total bytes of space to allocate */ int nByte; /* Total bytes of space to allocate */
MergeEngine *pNew; /* Pointer to allocated object to return */ MergeEngine *pNew; /* Pointer to allocated object to return */
assert( nIter<=SORTER_MAX_MERGE_COUNT ); /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
while( N<nIter ) N += N; while( N<nIter ) N += N;
nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
@ -793,8 +872,11 @@ static void vdbeMergeEngineFree(MergeEngine *pMerger){
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
int i; int i;
(void)vdbeSorterJoinAll(pSorter, SQLITE_OK); (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
vdbeMergeEngineFree(pSorter->pMerger); if( pSorter->pReader ){
pSorter->pMerger = 0; vdbePmaReaderClear(pSorter->pReader);
sqlite3DbFree(db, pSorter->pReader);
pSorter->pReader = 0;
}
for(i=0; i<pSorter->nTask; i++){ for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i]; SortSubtask *pTask = &pSorter->aTask[i];
vdbeSortSubtaskCleanup(db, pTask); vdbeSortSubtaskCleanup(db, pTask);
@ -806,6 +888,8 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
pSorter->nInMemory = 0; pSorter->nInMemory = 0;
pSorter->bUsePMA = 0; pSorter->bUsePMA = 0;
pSorter->iMemory = 0; pSorter->iMemory = 0;
sqlite3DbFree(db, pSorter->pUnpacked);
pSorter->pUnpacked = 0;
} }
/* /*
@ -815,7 +899,6 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter = pCsr->pSorter; VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){ if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter); sqlite3VdbeSorterReset(db, pSorter);
vdbeMergeEngineFree(pSorter->pMerger);
sqlite3_free(pSorter->aMemory); sqlite3_free(pSorter->aMemory);
sqlite3DbFree(db, pSorter); sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0; pCsr->pSorter = 0;
@ -1053,17 +1136,17 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
assert( pTask->nInMemory>0 ); assert( pTask->nInMemory>0 );
/* If the first temporary PMA file has not been opened, open it now. */ /* If the first temporary PMA file has not been opened, open it now. */
if( pTask->pTemp1==0 ){ if( pTask->file.pFd==0 ){
rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1); rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
assert( rc!=SQLITE_OK || pTask->pTemp1 ); assert( rc!=SQLITE_OK || pTask->file.pFd );
assert( pTask->iTemp1Off==0 ); assert( pTask->file.iEof==0 );
assert( pTask->nPMA==0 ); assert( pTask->nPMA==0 );
} }
/* Try to get the file to memory map */ /* Try to get the file to memory map */
if( rc==SQLITE_OK ){ if( rc==SQLITE_OK ){
vdbeSorterExtendFile(pTask->db, vdbeSorterExtendFile(pTask->db,
pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9 pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9
); );
} }
@ -1071,8 +1154,8 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
SorterRecord *p; SorterRecord *p;
SorterRecord *pNext = 0; SorterRecord *pNext = 0;
vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz, vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
pTask->iTemp1Off); pTask->file.iEof);
pTask->nPMA++; pTask->nPMA++;
vdbePmaWriteVarint(&writer, pTask->nInMemory); vdbePmaWriteVarint(&writer, pTask->nInMemory);
for(p=pTask->pList; p; p=pNext){ for(p=pTask->pList; p; p=pNext){
@ -1082,7 +1165,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
if( pTask->aListMemory==0 ) sqlite3_free(p); if( pTask->aListMemory==0 ) sqlite3_free(p);
} }
pTask->pList = p; pTask->pList = p;
rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off); rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
} }
assert( pTask->pList==0 || rc!=SQLITE_OK ); assert( pTask->pList==0 || rc!=SQLITE_OK );
@ -1164,6 +1247,23 @@ static int vdbeSorterNext(
return rc; return rc;
} }
#if 0
static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
i64 t;
int iTask = (pTask - pTask->pSorter->aTask);
sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
}
static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
i64 t;
sqlite3OsCurrentTimeInt64(db->pVfs, &t);
fprintf(stderr, "%lld:X %s\n", t, zEvent);
}
#else
# define vdbeSorterWorkDebug(x,y)
# define vdbeSorterRewindDebug(x,y)
#endif
/* /*
** The main routine for sorter-thread operations. ** The main routine for sorter-thread operations.
*/ */
@ -1177,6 +1277,8 @@ static void *vdbeSortSubtaskMain(void *pCtx){
); );
assert( pTask->bDone==0 ); assert( pTask->bDone==0 );
vdbeSorterWorkDebug(pTask, "enter");
if( pTask->pUnpacked==0 ){ if( pTask->pUnpacked==0 ){
char *pFree; char *pFree;
pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
@ -1211,7 +1313,7 @@ static void *vdbeSortSubtaskMain(void *pCtx){
/* Open a second temp file to write merged data to */ /* Open a second temp file to write merged data to */
rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2); rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
if( rc==SQLITE_OK ){ if( rc==SQLITE_OK ){
vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off); vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof);
}else{ }else{
vdbeMergeEngineFree(pMerger); vdbeMergeEngineFree(pMerger);
break; break;
@ -1231,9 +1333,9 @@ static void *vdbeSortSubtaskMain(void *pCtx){
int iIter; int iIter;
for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){ for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
PmaReader *pIter = &pMerger->aIter[iIter]; PmaReader *pIter = &pMerger->aIter[iIter];
rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut); rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut);
iReadOff = pIter->iEof; iReadOff = pIter->iEof;
if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break; if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break;
} }
for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
rc = vdbeSorterDoCompare(pTask, pMerger, iIter); rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
@ -1253,10 +1355,10 @@ static void *vdbeSortSubtaskMain(void *pCtx){
} }
vdbeMergeEngineFree(pMerger); vdbeMergeEngineFree(pMerger);
sqlite3OsCloseFree(pTask->pTemp1); sqlite3OsCloseFree(pTask->file.pFd);
pTask->pTemp1 = pTemp2; pTask->file.pFd = pTemp2;
pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
pTask->iTemp1Off = iWriteOff; pTask->file.iEof = iWriteOff;
} }
}else{ }else{
/* Sort the pTask->pList list */ /* Sort the pTask->pList list */
@ -1267,10 +1369,10 @@ static void *vdbeSortSubtaskMain(void *pCtx){
#ifdef SQLITE_DEBUG #ifdef SQLITE_DEBUG
i64 nExpect = pTask->nInMemory i64 nExpect = pTask->nInMemory
+ sqlite3VarintLen(pTask->nInMemory) + sqlite3VarintLen(pTask->nInMemory)
+ pTask->iTemp1Off; + pTask->file.iEof;
#endif #endif
rc = vdbeSorterListToPMA(pTask); rc = vdbeSorterListToPMA(pTask);
assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) ); assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) );
} }
} }
@ -1280,6 +1382,7 @@ static void *vdbeSortSubtaskMain(void *pCtx){
assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
rc = SQLITE_NOMEM; rc = SQLITE_NOMEM;
} }
vdbeSorterWorkDebug(pTask, "exit");
return SQLITE_INT_TO_PTR(rc); return SQLITE_INT_TO_PTR(rc);
} }
@ -1480,6 +1583,164 @@ static int vdbeSorterCountPMA(VdbeSorter *pSorter){
return nPMA; return nPMA;
} }
/*
** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
** of the data stored in aFile[1] is the same as that used by regular PMAs,
** except that the number-of-bytes varint is omitted from the start.
*/
static int vdbeIncrPopulate(IncrMerger *pIncr){
int rc = SQLITE_OK;
int rc2;
SorterFile *pOut = &pIncr->aFile[1];
MergeEngine *pMerger = pIncr->pMerger;
PmaWriter writer;
assert( pIncr->bEof==0 );
vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
while( rc==SQLITE_OK ){
int dummy;
PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
int nKey = pReader->nKey;
i64 iEof = writer.iWriteOff + writer.iBufEnd;
/* Check if the output file is full or if the input has been exhausted.
** In either case exit the loop. */
if( pReader->pFile==0 ) break;
if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
/* Write the next key to the output. */
vdbePmaWriteVarint(&writer, nKey);
vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
}
rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
if( rc==SQLITE_OK ) rc = rc2;
return rc;
}
static void *vdbeIncrPopulateThreadMain(void *pCtx){
IncrMerger *pIncr = (IncrMerger*)pCtx;
return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
}
static int vdbeIncrBgPopulate(IncrMerger *pIncr){
int rc;
assert( pIncr->pThread==0 );
if( pIncr->pTask->pSorter->bUseThreads==0 ){
rc = vdbeIncrPopulate(pIncr);
}else{
void *pCtx = (void*)pIncr;
rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
}
return rc;
}
static int vdbeIncrSwap(IncrMerger *pIncr){
int rc = SQLITE_OK;
if( pIncr->pThread ){
void *pRet;
rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
pIncr->pThread = 0;
}
if( rc==SQLITE_OK ){
SorterFile f0 = pIncr->aFile[0];
pIncr->aFile[0] = pIncr->aFile[1];
pIncr->aFile[1] = f0;
if( pIncr->aFile[0].iEof==0 ){
pIncr->bEof = 1;
}else{
rc = vdbeIncrBgPopulate(pIncr);
}
}
return rc;
}
static void vdbeIncrFree(IncrMerger *pIncr){
if( pIncr->pThread ){
void *pRet;
sqlite3ThreadJoin(pIncr->pThread, &pRet);
}
if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
vdbeMergeEngineFree(pIncr->pMerger);
sqlite3_free(pIncr);
}
/*
** Populate iterator *pIter so that it may be used to iterate through all
** keys stored in subtask pTask using the incremental merge method.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
SortSubtask *pTask0 = &pSorter->aTask[0];
int rc = SQLITE_OK;
MergeEngine *pMerger = 0;
IncrMerger *pIncr = 0;
int i;
int nPMA = 0;
for(i=0; i<pSorter->nTask; i++){
nPMA += pSorter->aTask[i].nPMA;
}
pMerger = vdbeMergeEngineNew(nPMA);
if( pMerger==0 ){
rc = SQLITE_NOMEM;
}else{
int iIter = 0;
int iPMA;
for(i=0; i<pSorter->nTask; i++){
i64 iReadOff = 0;
SortSubtask *pTask = &pSorter->aTask[i];
for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
i64 nDummy = 0;
PmaReader *pIter = &pMerger->aIter[iIter++];
rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
iReadOff = pIter->iEof;
}
}
for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
rc = vdbeSorterDoCompare(pTask0, pMerger, i);
}
}
if( rc==SQLITE_OK ){
pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
if( pIncr==0 ){
rc = SQLITE_NOMEM;
}else{
memset(pIncr, 0, sizeof(IncrMerger));
pIncr->mxSz = (pSorter->mxPmaSize / 2);
pIncr->pMerger = pMerger;
pIncr->pTask = pTask0;
}
}
/* Open the two temp files. */
if( rc==SQLITE_OK ){
rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
}
if( rc==SQLITE_OK ){
rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
}
/* Launch a background thread to populate aFile[1]. */
if( rc==SQLITE_OK ){
rc = vdbeIncrBgPopulate(pIncr);
}
pIter->pIncr = pIncr;
if( rc==SQLITE_OK ){
rc = vdbePmaReaderNext(pIter);
}
return rc;
}
/* /*
** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
** this function is called to prepare for iterating through the records ** this function is called to prepare for iterating through the records
@ -1520,70 +1781,21 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
/* Join all threads */ /* Join all threads */
rc = vdbeSorterJoinAll(pSorter, rc); rc = vdbeSorterJoinAll(pSorter, rc);
/* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge vdbeSorterRewindDebug(db, "rewind");
** some of them together so that this is no longer the case. */
if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
int i;
for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
if( pTask->pTemp1 ){
pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask;
pTask->eWork = SORT_SUBTASK_CONS;
#if SQLITE_MAX_WORKER_THREADS>0 /* Assuming no errors have occurred, set up a merger structure to
if( i<(pSorter->nTask-1) ){ ** incrementally read and merge all remaining PMAs. */
void *pCtx = (void*)pTask; assert( pSorter->pReader==0 );
rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
}else
#endif
{
rc = vdbeSorterRunTask(pTask);
}
}
}
}
/* Join all threads */
rc = vdbeSorterJoinAll(pSorter, rc);
/* Assuming no errors have occurred, set up a merger structure to read
** and merge all remaining PMAs. */
assert( pSorter->pMerger==0 );
if( rc==SQLITE_OK ){ if( rc==SQLITE_OK ){
int nIter = 0; /* Number of iterators used */ PmaReader *pReader;
int i; pReader = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
MergeEngine *pMerger; pSorter->pReader = pReader;
for(i=0; i<pSorter->nTask; i++){ rc = vdbePmaReaderIncrInit(pSorter, pReader);
nIter += pSorter->aTask[i].nPMA; assert( rc!=SQLITE_OK || pReader->pFile );
} *pbEof = 0;
pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter);
if( pMerger==0 ){
rc = SQLITE_NOMEM;
}else{
int iIter = 0;
int iThread = 0;
for(iThread=0; iThread<pSorter->nTask; iThread++){
int iPMA;
i64 iReadOff = 0;
SortSubtask *pTask = &pSorter->aTask[iThread];
for(iPMA=0; iPMA<pTask->nPMA && rc==SQLITE_OK; iPMA++){
i64 nDummy = 0;
PmaReader *pIter = &pMerger->aIter[iIter++];
rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy);
iReadOff = pIter->iEof;
}
}
for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i);
}
}
} }
if( rc==SQLITE_OK ){ vdbeSorterRewindDebug(db, "rewinddone");
*pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
}
return rc; return rc;
} }
@ -1594,8 +1806,9 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
VdbeSorter *pSorter = pCsr->pSorter; VdbeSorter *pSorter = pCsr->pSorter;
int rc; /* Return code */ int rc; /* Return code */
if( pSorter->pMerger ){ if( pSorter->pReader ){
rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); rc = vdbePmaReaderNext(pSorter->pReader);
*pbEof = (pSorter->pReader->pFile==0);
}else{ }else{
SorterRecord *pFree = pSorter->pRecord; SorterRecord *pFree = pSorter->pRecord;
pSorter->pRecord = pFree->u.pNext; pSorter->pRecord = pFree->u.pNext;
@ -1616,11 +1829,9 @@ static void *vdbeSorterRowkey(
int *pnKey /* OUT: Size of current key in bytes */ int *pnKey /* OUT: Size of current key in bytes */
){ ){
void *pKey; void *pKey;
if( pSorter->pMerger ){ if( pSorter->pReader ){
PmaReader *pIter; *pnKey = pSorter->pReader->nKey;
pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ]; pKey = pSorter->pReader->aKey;
*pnKey = pIter->nKey;
pKey = pIter->aKey;
}else{ }else{
*pnKey = pSorter->pRecord->nVal; *pnKey = pSorter->pRecord->nVal;
pKey = SRVAL(pSorter->pRecord); pKey = SRVAL(pSorter->pRecord);
@ -1669,13 +1880,19 @@ int sqlite3VdbeSorterCompare(
int *pRes /* OUT: Result of comparison */ int *pRes /* OUT: Result of comparison */
){ ){
VdbeSorter *pSorter = pCsr->pSorter; VdbeSorter *pSorter = pCsr->pSorter;
UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked; UnpackedRecord *r2 = pSorter->pUnpacked;
KeyInfo *pKeyInfo = pCsr->pKeyInfo; KeyInfo *pKeyInfo = pCsr->pKeyInfo;
int i; int i;
void *pKey; int nKey; /* Sorter key to compare pVal with */ void *pKey; int nKey; /* Sorter key to compare pVal with */
if( r2==0 ){
char *p;
r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p);
assert( pSorter->pUnpacked==(UnpackedRecord*)p );
if( r2==0 ) return SQLITE_NOMEM;
r2->nField = pKeyInfo->nField-nIgnore;
}
assert( r2->nField>=pKeyInfo->nField-nIgnore ); assert( r2->nField>=pKeyInfo->nField-nIgnore );
r2->nField = pKeyInfo->nField-nIgnore;
pKey = vdbeSorterRowkey(pSorter, &nKey); pKey = vdbeSorterRowkey(pSorter, &nKey);
sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);

View File

@ -47,6 +47,12 @@ do_execsql_test 2.2 {
CREATE UNIQUE INDEX i1 ON t1(b, a); CREATE UNIQUE INDEX i1 ON t1(b, a);
} }
do_execsql_test 2.3 {
CREATE UNIQUE INDEX i2 ON t1(a);
}
do_execsql_test 2.4 { PRAGMA integrity_check } {ok}
db close db close
sqlite3_shutdown sqlite3_shutdown
sqlite3_config_worker_threads 0 sqlite3_config_worker_threads 0