mirror of
https://github.com/sqlite/sqlite.git
synced 2025-11-12 13:01:09 +03:00
Reorganize some multi-threaded code in vdbesort.c so that full MC/DC test coverage does not depend on the outcome of a race condition.
FossilOrigin-Name: 78c7ec95931265b89a92f6a799fc9b1a9f0476bf
This commit is contained in:
166
src/vdbesort.c
166
src/vdbesort.c
@@ -2063,11 +2063,12 @@ static void vdbeMergeEngineCompare(
|
||||
#define INCRINIT_TASK 1
|
||||
#define INCRINIT_ROOT 2
|
||||
|
||||
/* Forward reference.
|
||||
** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each
|
||||
** other (when building a merge tree).
|
||||
/*
|
||||
** Forward reference required as the vdbeIncrMergeInit() and
|
||||
** vdbePmaReaderIncrInit() routines are called mutually recursively when
|
||||
** building a merge tree.
|
||||
*/
|
||||
static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode);
|
||||
static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
|
||||
|
||||
/*
|
||||
** Initialize the MergeEngine object passed as the second argument. Once this
|
||||
@@ -2114,7 +2115,7 @@ static int vdbeMergeEngineInit(
|
||||
** better advantage of multi-processor hardware. */
|
||||
rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
|
||||
}else{
|
||||
rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
|
||||
rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
|
||||
}
|
||||
if( rc!=SQLITE_OK ) return rc;
|
||||
}
|
||||
@@ -2126,17 +2127,15 @@ static int vdbeMergeEngineInit(
|
||||
}
|
||||
|
||||
/*
|
||||
** Initialize the IncrMerge field of a PmaReader.
|
||||
**
|
||||
** If the PmaReader passed as the first argument is not an incremental-reader
|
||||
** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
|
||||
** to open and/or initialize the temp file related fields of the IncrMerge
|
||||
** The PmaReader passed as the first argument is guaranteed to be an
|
||||
** incremental-reader (pReadr->pIncr!=0). This function serves to open
|
||||
** and/or initialize the temp file related fields of the IncrMerge
|
||||
** object at (pReadr->pIncr).
|
||||
**
|
||||
** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
|
||||
** in the sub-tree headed by pReadr are also initialized. Data is then loaded
|
||||
** into the buffers belonging to pReadr and it is set to
|
||||
** point to the first key in its range.
|
||||
** in the sub-tree headed by pReadr are also initialized. Data is then
|
||||
** loaded into the buffers belonging to pReadr and it is set to point to
|
||||
** the first key in its range.
|
||||
**
|
||||
** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
|
||||
** to be a multi-threaded PmaReader and this function is being called in a
|
||||
@@ -2163,59 +2162,62 @@ static int vdbeMergeEngineInit(
|
||||
static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
|
||||
int rc = SQLITE_OK;
|
||||
IncrMerger *pIncr = pReadr->pIncr;
|
||||
SortSubtask *pTask = pIncr->pTask;
|
||||
sqlite3 *db = pTask->pSorter->db;
|
||||
|
||||
/* eMode is always INCRINIT_NORMAL in single-threaded mode */
|
||||
assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
|
||||
|
||||
if( pIncr ){
|
||||
SortSubtask *pTask = pIncr->pTask;
|
||||
sqlite3 *db = pTask->pSorter->db;
|
||||
rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
|
||||
|
||||
rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
|
||||
|
||||
/* Set up the required files for pIncr. A multi-theaded IncrMerge object
|
||||
** requires two temp files to itself, whereas a single-threaded object
|
||||
** only requires a region of pTask->file2. */
|
||||
if( rc==SQLITE_OK ){
|
||||
int mxSz = pIncr->mxSz;
|
||||
/* Set up the required files for pIncr. A multi-theaded IncrMerge object
|
||||
** requires two temp files to itself, whereas a single-threaded object
|
||||
** only requires a region of pTask->file2. */
|
||||
if( rc==SQLITE_OK ){
|
||||
int mxSz = pIncr->mxSz;
|
||||
#if SQLITE_MAX_WORKER_THREADS>0
|
||||
if( pIncr->bUseThread ){
|
||||
rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
|
||||
}
|
||||
}else
|
||||
if( pIncr->bUseThread ){
|
||||
rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
|
||||
if( rc==SQLITE_OK ){
|
||||
rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
|
||||
}
|
||||
}else
|
||||
#endif
|
||||
/*if( !pIncr->bUseThread )*/{
|
||||
if( pTask->file2.pFd==0 ){
|
||||
assert( pTask->file2.iEof>0 );
|
||||
rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
|
||||
pTask->file2.iEof = 0;
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
pIncr->aFile[1].pFd = pTask->file2.pFd;
|
||||
pIncr->iStartOff = pTask->file2.iEof;
|
||||
pTask->file2.iEof += mxSz;
|
||||
}
|
||||
/*if( !pIncr->bUseThread )*/{
|
||||
if( pTask->file2.pFd==0 ){
|
||||
assert( pTask->file2.iEof>0 );
|
||||
rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
|
||||
pTask->file2.iEof = 0;
|
||||
}
|
||||
if( rc==SQLITE_OK ){
|
||||
pIncr->aFile[1].pFd = pTask->file2.pFd;
|
||||
pIncr->iStartOff = pTask->file2.iEof;
|
||||
pTask->file2.iEof += mxSz;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if SQLITE_MAX_WORKER_THREADS>0
|
||||
if( rc==SQLITE_OK && pIncr->bUseThread ){
|
||||
/* Use the current thread to populate aFile[1], even though this
|
||||
** PmaReader is multi-threaded. The reason being that this function
|
||||
** is already running in background thread pIncr->pTask->thread. */
|
||||
assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
|
||||
rc = vdbeIncrPopulate(pIncr);
|
||||
}
|
||||
if( rc==SQLITE_OK && pIncr->bUseThread ){
|
||||
/* Use the current thread to populate aFile[1], even though this
|
||||
** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
|
||||
** then this function is already running in background thread
|
||||
** pIncr->pTask->thread.
|
||||
**
|
||||
** If this is the INCRINIT_ROOT object, then it is running in the
|
||||
** main VDBE thread. But that is Ok, as that thread cannot return
|
||||
** control to the VDBE or proceed with anything useful until the
|
||||
** first results are ready from this merger object anyway.
|
||||
*/
|
||||
assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
|
||||
rc = vdbeIncrPopulate(pIncr);
|
||||
}
|
||||
#endif
|
||||
|
||||
if( rc==SQLITE_OK
|
||||
&& (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK)
|
||||
){
|
||||
rc = vdbePmaReaderNext(pReadr);
|
||||
}
|
||||
if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
|
||||
rc = vdbePmaReaderNext(pReadr);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@@ -2224,7 +2226,7 @@ static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
|
||||
** The main routine for vdbePmaReaderIncrMergeInit() operations run in
|
||||
** background threads.
|
||||
*/
|
||||
static void *vdbePmaReaderBgInit(void *pCtx){
|
||||
static void *vdbePmaReaderBgIncrInit(void *pCtx){
|
||||
PmaReader *pReader = (PmaReader*)pCtx;
|
||||
void *pRet = SQLITE_INT_TO_PTR(
|
||||
vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
|
||||
@@ -2232,20 +2234,36 @@ static void *vdbePmaReaderBgInit(void *pCtx){
|
||||
pReader->pIncr->pTask->bDone = 1;
|
||||
return pRet;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK)
|
||||
** on the PmaReader object passed as the first argument.
|
||||
**
|
||||
** This call will initialize the various fields of the pReadr->pIncr
|
||||
** structure and, if it is a multi-threaded IncrMerger, launch a
|
||||
** background thread to populate aFile[1].
|
||||
** If the PmaReader passed as the first argument is not an incremental-reader
|
||||
** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
|
||||
** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
|
||||
** this routine to initialize the incremental merge.
|
||||
**
|
||||
** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1),
|
||||
** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
|
||||
** Or, if the IncrMerger is single threaded, the same function is called
|
||||
** using the current thread.
|
||||
*/
|
||||
static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){
|
||||
void *pCtx = (void*)pReadr;
|
||||
return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx);
|
||||
}
|
||||
static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
|
||||
IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */
|
||||
int rc = SQLITE_OK; /* Return code */
|
||||
if( pIncr ){
|
||||
#if SQLITE_MAX_WORKER_THREADS>0
|
||||
assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
|
||||
if( pIncr->bUseThread ){
|
||||
void *pCtx = (void*)pReadr;
|
||||
rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
|
||||
}else
|
||||
#endif
|
||||
{
|
||||
rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
|
||||
@@ -2490,15 +2508,21 @@ static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
|
||||
}
|
||||
}
|
||||
for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
|
||||
/* Check that:
|
||||
**
|
||||
** a) The incremental merge object is configured to use the
|
||||
** right task, and
|
||||
** b) If it is using task (nTask-1), it is configured to run
|
||||
** in single-threaded mode. This is important, as the
|
||||
** root merge (INCRINIT_ROOT) will be using the same task
|
||||
** object.
|
||||
*/
|
||||
PmaReader *p = &pMain->aReadr[iTask];
|
||||
assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
|
||||
if( p->pIncr ){
|
||||
if( iTask==pSorter->nTask-1 ){
|
||||
rc = vdbePmaReaderIncrMergeInit(p, INCRINIT_TASK);
|
||||
}else{
|
||||
rc = vdbePmaReaderBgIncrInit(p);
|
||||
}
|
||||
}
|
||||
assert( p->pIncr==0 || (
|
||||
(p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */
|
||||
&& (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */
|
||||
));
|
||||
rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
|
||||
}
|
||||
}
|
||||
pMain = 0;
|
||||
|
||||
Reference in New Issue
Block a user