From ef7a6304961a4f323e3ceb10227777be7293c73a Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 23 Sep 2014 20:39:55 +0000 Subject: [PATCH] Begin adding 'streaming' APIs to sessions module. This is a work in progress. FossilOrigin-Name: 3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5 --- ext/session/sqlite3session.c | 436 ++++++++++++++++++++++++++--------- ext/session/sqlite3session.h | 58 +++++ ext/session/test_session.c | 68 +++++- manifest | 20 +- manifest.uuid | 2 +- test/permutations.test | 8 + 6 files changed, 467 insertions(+), 125 deletions(-) diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c index 4617e6a3c2..ec0387e775 100644 --- a/ext/session/sqlite3session.c +++ b/ext/session/sqlite3session.c @@ -12,6 +12,12 @@ typedef struct SessionTable SessionTable; typedef struct SessionChange SessionChange; typedef struct SessionBuffer SessionBuffer; +typedef struct SessionInput SessionInput; + +/* +** Minimum chunk size used by streaming versions of functions. +*/ +#define SESSIONS_STR_CHUNK_SIZE 1024 /* ** Session handle structure. @@ -29,14 +35,38 @@ struct sqlite3_session { SessionTable *pTable; /* List of attached tables */ }; +/* +** Instances of this structure are used to build strings or binary records. +*/ +struct SessionBuffer { + u8 *aBuf; /* Pointer to changeset buffer */ + int nBuf; /* Size of buffer aBuf */ + int nAlloc; /* Size of allocation containing aBuf */ +}; + +/* +** An object of this type is used internally as an abstraction for the +** input data read by changeset iterators. Input data may be supplied +** either as a single large buffer (sqlite3changeset_start()) or using +** a stream function (sqlite3changeset_start_str()). +*/ +struct SessionInput { + int iNext; /* Offset in aChangeset[] of next change */ + u8 *aChangeset; /* Pointer to buffer containing changeset */ + int nChangeset; /* Number of bytes in aChangeset */ + SessionBuffer buf; /* Current read buffer */ + int (*xInput)(void*, void*, int*); /* Input stream call (or NULL) */ + void *pIn; /* First argument to xInput */ + int bEof; /* Set to true after xInput finished */ +}; + /* ** Structure for changeset iterators. */ struct sqlite3_changeset_iter { - u8 *aChangeset; /* Pointer to buffer containing changeset */ - int nChangeset; /* Number of bytes in aChangeset */ + SessionInput in; /* Input buffer or stream */ + SessionBuffer tblhdr; /* Buffer to hold apValue/zTab/abPK/ */ int bPatchset; /* True if this is a patchset */ - u8 *pNext; /* Pointer to next change within aChangeset */ int rc; /* Iterator error code */ sqlite3_stmt *pConflict; /* Points to conflicting row, if any */ char *zTab; /* Current table */ @@ -165,15 +195,6 @@ struct SessionChange { SessionChange *pNext; /* For hash-table collisions */ }; -/* -** Instances of this structure are used to build strings or binary records. -*/ -struct SessionBuffer { - u8 *aBuf; /* Pointer to changeset buffer */ - int nBuf; /* Size of buffer aBuf */ - int nAlloc; /* Size of allocation containing aBuf */ -}; - /* ** Write a varint with value iVal into the buffer at aBuf. Return the ** number of bytes written. @@ -1291,7 +1312,7 @@ static int sessionBufferGrow(SessionBuffer *p, int nByte, int *pRc){ int nNew = p->nAlloc ? p->nAlloc : 128; do { nNew = nNew*2; - }while( nNew<(p->nAlloc+nByte) ); + }while( nNew<(p->nBuf+nByte) ); aNew = (u8 *)sqlite3_realloc(p->aBuf, nNew); if( 0==aNew ){ @@ -1776,6 +1797,8 @@ static void sessionAppendTableHdr( int sessionGenerateChangeset( sqlite3_session *pSession, /* Session object */ int bPatchset, /* True for patchset, false for changeset */ + int (*xOutput)(void *pOut, const void *pData, int nData), + void *pOut, /* First argument for xOutput */ int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ void **ppChangeset /* OUT: Buffer containing changeset */ ){ @@ -1784,11 +1807,15 @@ int sessionGenerateChangeset( SessionBuffer buf = {0,0,0}; /* Buffer in which to accumlate changeset */ int rc; /* Return code */ + assert( xOutput==0 || (pnChangeset==0 && ppChangeset==0 ) ); + /* Zero the output variables in case an error occurs. If this session ** object is already in the error state (sqlite3_session.rc != SQLITE_OK), ** this call will be a no-op. */ - *pnChangeset = 0; - *ppChangeset = 0; + if( xOutput==0 ){ + *pnChangeset = 0; + *ppChangeset = 0; + } if( pSession->rc ) return pSession->rc; rc = sqlite3_exec(pSession->db, "SAVEPOINT changeset", 0, 0, 0); @@ -1846,6 +1873,19 @@ int sessionGenerateChangeset( if( rc==SQLITE_OK ){ rc = sqlite3_reset(pSel); } + + /* If the buffer is now larger than SESSIONS_STR_CHUNK_SIZE, pass + ** its contents to the xOutput() callback. */ + if( xOutput + && rc==SQLITE_OK + && buf.nBuf>nNoop + && buf.nBuf>SESSIONS_STR_CHUNK_SIZE + ){ + rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf); + nNoop = -1; + buf.nBuf = 0; + } + } } @@ -1858,12 +1898,16 @@ int sessionGenerateChangeset( } if( rc==SQLITE_OK ){ - *pnChangeset = buf.nBuf; - *ppChangeset = buf.aBuf; - }else{ - sqlite3_free(buf.aBuf); + if( xOutput==0 ){ + *pnChangeset = buf.nBuf; + *ppChangeset = buf.aBuf; + buf.aBuf = 0; + }else if( buf.nBuf>0 ){ + rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf); + } } + sqlite3_free(buf.aBuf); sqlite3_exec(db, "RELEASE changeset", 0, 0, 0); sqlite3_mutex_leave(sqlite3_db_mutex(db)); return rc; @@ -1881,7 +1925,29 @@ int sqlite3session_changeset( int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */ void **ppChangeset /* OUT: Buffer containing changeset */ ){ - return sessionGenerateChangeset(pSession, 0, pnChangeset, ppChangeset); + return sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset, ppChangeset); +} + +/* +** Streaming version of sqlite3session_changeset(). +*/ +int sqlite3session_changeset_str( + sqlite3_session *pSession, + int (*xOutput)(void *pOut, const void *pData, int nData), + void *pOut +){ + return sessionGenerateChangeset(pSession, 0, xOutput, pOut, 0, 0); +} + +/* +** Streaming version of sqlite3session_patchset(). +*/ +int sqlite3session_patchset_str( + sqlite3_session *pSession, + int (*xOutput)(void *pOut, const void *pData, int nData), + void *pOut +){ + return sessionGenerateChangeset(pSession, 1, xOutput, pOut, 0, 0); } /* @@ -1896,7 +1962,7 @@ int sqlite3session_patchset( int *pnPatchset, /* OUT: Size of buffer at *ppChangeset */ void **ppPatchset /* OUT: Buffer containing changeset */ ){ - return sessionGenerateChangeset(pSession, 1, pnPatchset, ppPatchset); + return sessionGenerateChangeset(pSession, 1, 0, 0, pnPatchset, ppPatchset); } /* @@ -1945,16 +2011,20 @@ int sqlite3session_isempty(sqlite3_session *pSession){ } /* -** Create an iterator used to iterate through the contents of a changeset. +** Do the work for either sqlite3changeset_start() or start_str(). */ -int sqlite3changeset_start( +int sessionChangesetStart( sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ + int (*xInput)(void *pIn, void *pData, int *pnData), + void *pIn, int nChangeset, /* Size of buffer pChangeset in bytes */ void *pChangeset /* Pointer to buffer containing changeset */ ){ sqlite3_changeset_iter *pRet; /* Iterator to return */ int nByte; /* Number of bytes to allocate for iterator */ + assert( xInput==0 || (pChangeset==0 && nChangeset==0) ); + /* Zero the output variable in case an error occurs. */ *pp = 0; @@ -1963,15 +2033,80 @@ int sqlite3changeset_start( pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte); if( !pRet ) return SQLITE_NOMEM; memset(pRet, 0, sizeof(sqlite3_changeset_iter)); - pRet->aChangeset = (u8 *)pChangeset; - pRet->nChangeset = nChangeset; - pRet->pNext = pRet->aChangeset; + pRet->in.aChangeset = (u8 *)pChangeset; + pRet->in.nChangeset = nChangeset; + pRet->in.xInput = xInput; + pRet->in.pIn = pIn; + pRet->in.iNext = 0; + pRet->in.bEof = (xInput ? 0 : 1); /* Populate the output variable and return success. */ *pp = pRet; return SQLITE_OK; } +/* +** Create an iterator used to iterate through the contents of a changeset. +*/ +int sqlite3changeset_start( + sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ + int nChangeset, /* Size of buffer pChangeset in bytes */ + void *pChangeset /* Pointer to buffer containing changeset */ +){ + return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset); +} + +/* +** Streaming version of sqlite3changeset_start(). +*/ +int sqlite3changeset_start_str( + sqlite3_changeset_iter **pp, /* OUT: Changeset iterator handle */ + int (*xInput)(void *pIn, void *pData, int *pnData), + void *pIn +){ + return sessionChangesetStart(pp, xInput, pIn, 0, 0); +} + +/* +** Ensure that there are at least nByte bytes available in the buffer. Or, +** if there are not nByte bytes remaining in the input, that all available +** data is in the buffer. +** +** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise. +*/ +static int sessionInputBuffer(SessionInput *pInput, int nByte){ + int rc = SQLITE_OK; + if( pInput->xInput && !pInput->bEof ){ + assert( 0 ); + } + return rc; +} + +/* +** When this function is called, *ppRec points to the start of a record +** that contains nCol values. This function advances the pointer *ppRec +** until it points to the byte immediately following that record. +*/ +static void sessionSkipRecord( + u8 **ppRec, /* IN/OUT: Record pointer */ + int nCol /* Number of values in record */ +){ + u8 *aRec = *ppRec; + int i; + for(i=0; iaChangeset[pIn->iNext++]; + } + assert( !apOut || apOut[i]==0 ); if( eType ){ if( apOut ){ apOut[i] = sqlite3ValueNew(0); - if( !apOut[i] ) return SQLITE_NOMEM; + if( !apOut[i] ) rc = SQLITE_NOMEM; } + } + if( rc==SQLITE_OK ){ + u8 *aVal = &pIn->aChangeset[pIn->iNext]; if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ int nByte; - aRec += sessionVarintGet(aRec, &nByte); - if( apOut ){ + pIn->iNext += sessionVarintGet(aVal, &nByte); + rc = sessionInputBuffer(pIn, nByte); + if( apOut && rc==SQLITE_OK ){ + u8 *aRec = &pIn->aChangeset[pIn->iNext]; u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0); sqlite3ValueSetStr(apOut[i], nByte, (char *)aRec, enc, SQLITE_STATIC); } - aRec += nByte; + pIn->iNext += nByte; } if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){ if( apOut ){ - sqlite3_int64 v = sessionGetI64(aRec); + sqlite3_int64 v = sessionGetI64(aVal); if( eType==SQLITE_INTEGER ){ sqlite3VdbeMemSetInt64(apOut[i], v); }else{ @@ -2036,13 +2180,83 @@ static int sessionReadRecord( sqlite3VdbeMemSetDouble(apOut[i], d); } } - aRec += 8; + pIn->iNext += 8; } } } - *paChange = aRec; - return SQLITE_OK; + return rc; +} + +/* +** The input pointer currently points to the second byte of a table-header. +** Specifically, to the following: +** +** + number of columns in table (varint) +** + array of PK flags (1 byte per column), +** + table name (nul terminated). +** +** This function ensures that all of the above is present in the input +** buffer (i.e. that it can be accessed without any calls to xInput()). +** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code. +** The input pointer is not moved. +*/ +static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){ + int rc = SQLITE_OK; + int nCol = 0; + int iIn = pIn->iNext; + + rc = sessionInputBuffer(pIn, 9); + if( rc==SQLITE_OK ){ + iIn += sessionVarintGet(&pIn->aChangeset[iIn], &nCol); + rc = sessionInputBuffer(pIn, nCol+100); + iIn += nCol; + } + while( rc==SQLITE_OK ){ + while( iInnChangeset && pIn->aChangeset[iIn] ) iIn++; + if( pIn->aChangeset[iIn]==0 ) break; + rc = sessionInputBuffer(pIn, 100); + } + if( pnByte ) *pnByte = (iIn+1 - pIn->iNext); + return rc; +} + +/* +** The input pointer currently points to the second byte of a table-header. +** Specifically, to the following: +** +** + number of columns in table (varint) +** + array of PK flags (1 byte per column), +** + table name (nul terminated). +*/ +static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){ + int rc; + int nCopy; + assert( p->rc==SQLITE_OK ); + + rc = sessionChangesetBufferTblhdr(&p->in, &nCopy); + if( rc==SQLITE_OK ){ + int nByte; + int nVarint; + nVarint = sessionVarintGet(&p->in.aChangeset[p->in.iNext], &p->nCol); + nCopy -= nVarint; + p->in.iNext += nVarint; + nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy; + p->tblhdr.nBuf = 0; + sessionBufferGrow(&p->tblhdr, nByte, &rc); + } + + if( rc==SQLITE_OK ){ + int iPK = sizeof(sqlite3_value*)*p->nCol*2; + memset(p->tblhdr.aBuf, 0, iPK); + memcpy(&p->tblhdr.aBuf[iPK], &p->in.aChangeset[p->in.iNext], nCopy); + p->in.iNext += nCopy; + } + + p->apValue = (sqlite3_value**)p->tblhdr.aBuf; + p->abPK = (u8*)&p->apValue[p->nCol*2]; + p->zTab = (char*)&p->abPK[p->nCol]; + return (p->rc = rc); } /* @@ -2066,9 +2280,10 @@ static int sessionChangesetNext( u8 **paRec, /* If non-NULL, store record pointer here */ int *pnRec /* If non-NULL, store size of record here */ ){ - u8 *aChange; int i; + u8 op; + assert( paRec==0 || p->in.xInput==0 ); /* fixme! */ assert( (paRec==0 && pnRec==0) || (paRec && pnRec) ); /* If the iterator is in the error-state, return immediately. */ @@ -2082,57 +2297,50 @@ static int sessionChangesetNext( memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2); } + /* Make sure the buffer contains at least 10 bytes of input data, or all + ** remaining data if there are less than 10 bytes available. This is + ** sufficient either for the 'T' or 'P' byte and the varint that follows + ** it, or for the two single byte values otherwise. */ + p->rc = sessionInputBuffer(&p->in, 2); + if( p->rc!=SQLITE_OK ) return p->rc; + /* If the iterator is already at the end of the changeset, return DONE. */ - if( p->pNext>=&p->aChangeset[p->nChangeset] ){ + if( p->in.iNext>=p->in.nChangeset ){ return SQLITE_DONE; } - aChange = p->pNext; - if( aChange[0]=='T' || aChange[0]=='P' ){ - int nByte; /* Bytes to allocate for apValue */ - p->bPatchset = (aChange[0]=='P'); - aChange++; - aChange += sessionVarintGet(aChange, &p->nCol); - p->abPK = (u8 *)aChange; - aChange += p->nCol; - p->zTab = (char *)aChange; - aChange += (sqlite3Strlen30((char *)aChange) + 1); - - if( paRec==0 ){ - sqlite3_free(p->apValue); - nByte = sizeof(sqlite3_value *) * p->nCol * 2; - p->apValue = (sqlite3_value **)sqlite3_malloc(nByte); - if( !p->apValue ){ - return (p->rc = SQLITE_NOMEM); - } - memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2); - } + op = p->in.aChangeset[p->in.iNext++]; + if( op=='T' || op=='P' ){ + p->bPatchset = (op=='P'); + if( sessionChangesetReadTblhdr(p) ) return p->rc; + if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc; + op = p->in.aChangeset[p->in.iNext++]; } - p->op = *(aChange++); - p->bIndirect = *(aChange++); + p->op = op; + p->bIndirect = p->in.aChangeset[p->in.iNext++]; if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ return (p->rc = SQLITE_CORRUPT); } - if( paRec ){ *paRec = aChange; } + if( paRec ){ *paRec = &p->in.aChangeset[p->in.iNext]; } /* If this is an UPDATE or DELETE, read the old.* record. */ if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){ u8 *abPK = p->bPatchset ? p->abPK : 0; - p->rc = sessionReadRecord(&aChange, p->nCol, abPK, paRec?0:p->apValue); + p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue); if( p->rc!=SQLITE_OK ) return p->rc; } /* If this is an INSERT or UPDATE, read the new.* record. */ if( p->op!=SQLITE_DELETE ){ sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]); - p->rc = sessionReadRecord(&aChange, p->nCol, 0, apOut); + p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut); if( p->rc!=SQLITE_OK ) return p->rc; } - if( pnRec ){ - *pnRec = (int)(aChange - *paRec); + if( pnRec ){ + *pnRec = (int)(&p->in.aChangeset[p->in.iNext] - *paRec); }else if( p->bPatchset && p->op==SQLITE_UPDATE ){ /* If this is an UPDATE that is part of a patchset, then all PK and ** modified fields are present in the new.* record. The old.* record @@ -2148,7 +2356,7 @@ static int sessionChangesetNext( } } } - p->pNext = aChange; + return SQLITE_ROW; } @@ -2321,7 +2529,7 @@ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ if( p->apValue ){ for(i=0; inCol*2; i++) sqlite3ValueFree(p->apValue[i]); } - sqlite3_free(p->apValue); + sqlite3_free(p->tblhdr.aBuf); sqlite3_free(p); return rc; } @@ -2339,52 +2547,71 @@ int sqlite3changeset_invert( u8 *aOut; u8 *aIn; int i; + SessionInput sInput; int nCol = 0; /* Number of cols in current table */ u8 *abPK = 0; /* PK array for current table */ sqlite3_value **apVal = 0; /* Space for values for UPDATE inversion */ + SessionBuffer sPK = {0, 0, 0}; /* PK array for current table */ /* Zero the output variables in case an error occurs. */ *ppInverted = 0; *pnInverted = 0; if( nChangeset==0 ) return SQLITE_OK; + /* Set up the input stream */ + memset(&sInput, 0, sizeof(SessionInput)); + sInput.nChangeset = nChangeset; + sInput.aChangeset = (u8*)pChangeset; + aOut = (u8 *)sqlite3_malloc(nChangeset); if( !aOut ) return SQLITE_NOMEM; aIn = (u8 *)pChangeset; i = 0; while( iop = SQLITE_INSERT; - if( bPatchset==0 ) sessionReadRecord(&a1, pTab->nCol, 0, 0); + if( bPatchset==0 ) sessionSkipRecord(&a1, pTab->nCol); sessionMergeRecord(&aCsr, pTab->nCol, aExist, a1); }else if( op1==SQLITE_DELETE ){ /* DELETE + INSERT */ assert( op2==SQLITE_INSERT ); @@ -3290,8 +3518,8 @@ static int sessionChangeMerge( u8 *a2 = aRec; assert( op1==SQLITE_UPDATE ); if( bPatchset==0 ){ - sessionReadRecord(&a1, pTab->nCol, 0, 0); - sessionReadRecord(&a2, pTab->nCol, 0, 0); + sessionSkipRecord(&a1, pTab->nCol); + sessionSkipRecord(&a2, pTab->nCol); } pNew->op = SQLITE_UPDATE; if( 0==sessionMergeUpdate(&aCsr, pTab, bPatchset, aRec, aExist,a1,a2) ){ @@ -3356,14 +3584,8 @@ static int sessionConcatChangeset( break; } - assert( pIter->apValue==0 ); sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect); - - assert( zNew>=(char *)pChangeset && zNew-nChangeset<((char *)pChangeset) ); - assert( !pTab || pTab->zName-nChangeset<(char *)pChangeset ); - assert( !pTab || zNew>=pTab->zName ); - - if( !pTab || zNew!=pTab->zName ){ + if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){ /* Search the list for a matching table */ int nNew = (int)strlen(zNew); u8 *abPK; @@ -3373,21 +3595,23 @@ static int sessionConcatChangeset( if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break; } if( !pTab ){ - pTab = sqlite3_malloc(sizeof(SessionTable)); + pTab = sqlite3_malloc(sizeof(SessionTable) + nCol + nNew+1); if( !pTab ){ rc = SQLITE_NOMEM; break; } memset(pTab, 0, sizeof(SessionTable)); pTab->pNext = *ppTabList; - pTab->abPK = abPK; pTab->nCol = nCol; + pTab->abPK = (u8*)&pTab[1]; + memcpy(pTab->abPK, abPK, nCol); + pTab->zName = (char*)&pTab->abPK[nCol]; + memcpy(pTab->zName, zNew, nNew+1); *ppTabList = pTab; }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){ rc = SQLITE_SCHEMA; break; } - pTab->zName = (char *)zNew; } if( sessionGrowHash(bPatchset, pTab) ){ diff --git a/ext/session/sqlite3session.h b/ext/session/sqlite3session.h index ced984ecbe..4737bd953f 100644 --- a/ext/session/sqlite3session.h +++ b/ext/session/sqlite3session.h @@ -273,6 +273,31 @@ int sqlite3session_changeset( void **ppChangeset /* OUT: Buffer containing changeset */ ); + +/* +** This function is similar to sqlite3session_changeset(), except that instead +** of storing the output changeset in a buffer obtained from sqlite3_malloc() +** it invokes the supplied xOutput() callback zero or more times to stream the +** changeset to the application. This is useful in order to avoid large memory +** allocations when working with very large changesets. +** +** The first parameter passed to each call to the xOutput callback is a copy +** of the pOut parameter passed to this function. The following two parameters +** are a pointer to the buffer containing the next chunk of the output changeset +** and the size of that buffer in bytes. +** +** If the data is successfully processed by the xOutput callback, it should +** return SQLITE_OK. Or, if an error occurs, some other SQLite error code. In +** this case the sqlite3session_changeset_str() call is abandoned immediately +** and returns a copy of the xOutput return code. +*/ +int sqlite3session_changeset_str( + sqlite3_session *pSession, + int (*xOutput)(void *pOut, const void *pData, int nData), + void *pOut +); + + /* ** CAPI3REF: Generate A Patchset From A Session Object ** @@ -302,6 +327,15 @@ int sqlite3session_patchset( void **ppPatchset /* OUT: Buffer containing changeset */ ); +/* +** Streaming version of sqlite3session_patchset(). +*/ +int sqlite3session_patchset_str( + sqlite3_session *pSession, + int (*xOutput)(void *pOut, const void *pData, int nData), + void *pOut +); + /* ** CAPI3REF: Test if a changeset has recorded any changes. ** @@ -358,6 +392,30 @@ int sqlite3changeset_start( void *pChangeset /* Pointer to blob containing changeset */ ); + +/* +** This function is similar to sqlite3changeset_start(), except that instead +** of reading data from a single buffer, it requests it one chunk at a time +** from the application by invoking the supplied xInput() callback. The xInput() +** callback may be invoked at any time during the lifetime of the iterator. +** +** Each time the xInput callback is invoked, the first argument passed is a +** copy of the third parameter passed to this function. The second argument, +** pData, points to a buffer (*pnData) bytes in size. Assuming no error occurs +** the xInput method should copy up to (*pnData) bytes of data into the buffer +** and set (*pnData) to the actual number of bytes copied before returning +** SQLITE_OK. If the input is completely exhausted, (*pnData) should be set +** to zero to indicate this. Or, if an error occurs, an SQLite error code +** should be returned. In this case the iterator is put into an error state and +** all subsequent calls to iterator methods return a copy of the xInput error +** code. +*/ +int sqlite3changeset_start_str( + sqlite3_changeset_iter **pp, + int (*xInput)(void *pIn, void *pData, int *pnData), + void *pIn +); + /* ** CAPI3REF: Advance A Changeset Iterator ** diff --git a/ext/session/test_session.c b/ext/session/test_session.c index 38e4be1481..bf4bde39a4 100644 --- a/ext/session/test_session.c +++ b/ext/session/test_session.c @@ -14,6 +14,23 @@ struct TestSession { Tcl_Obj *pFilterScript; }; +#define SESSION_STREAM_TCL_VAR "sqlite3session_streams" + +/* +** Attempt to find the global variable zVar within interpreter interp +** and extract a boolean value from it. Return this value. +** +** If the named variable cannot be found, or if it cannot be interpreted +** as a boolean, return 0. +*/ +static int test_tcl_boolean(Tcl_Interp *interp, const char *zVar){ + Tcl_Obj *pObj; + int bVal = 0; + pObj = Tcl_ObjGetVar2(interp, Tcl_NewStringObj(zVar, -1), 0, TCL_GLOBAL_ONLY); + if( pObj ) Tcl_GetBooleanFromObj(0, pObj, &bVal); + return bVal; +} + static int test_session_error(Tcl_Interp *interp, int rc){ extern const char *sqlite3ErrName(int); Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1)); @@ -44,12 +61,38 @@ static int test_table_filter(void *pCtx, const char *zTbl){ return bRes; } +struct TestSessionsBlob { + void *p; + int n; +}; +typedef struct TestSessionsBlob TestSessionsBlob; + +static int testSessionsOutput( + void *pCtx, + const void *pData, + int nData +){ + TestSessionsBlob *pBlob = (TestSessionsBlob*)pCtx; + char *pNew; + + assert( nData>0 ); + pNew = (char*)sqlite3_realloc(pBlob->p, pBlob->n + nData); + if( pNew==0 ){ + return SQLITE_NOMEM; + } + pBlob->p = (void*)pNew; + memcpy(&pNew[pBlob->n], pData, nData); + pBlob->n += nData; + return SQLITE_OK; +} + /* ** Tclcmd: $session attach TABLE ** $session changeset ** $session delete ** $session enable BOOL ** $session indirect INTEGER +** $session patchset ** $session table_filter SCRIPT */ static int test_session_cmd( @@ -105,17 +148,26 @@ static int test_session_cmd( case 7: /* patchset */ case 1: { /* changeset */ - int nChange; - void *pChange; - if( iSub==7 ){ - rc = sqlite3session_patchset(pSession, &nChange, &pChange); + TestSessionsBlob o = {0, 0}; + if( test_tcl_boolean(interp, SESSION_STREAM_TCL_VAR) ){ + void *pCtx = (void*)&o; + if( iSub==7 ){ + rc = sqlite3session_patchset_str(pSession, testSessionsOutput, pCtx); + }else{ + rc = sqlite3session_changeset_str(pSession, testSessionsOutput, pCtx); + } }else{ - rc = sqlite3session_changeset(pSession, &nChange, &pChange); + if( iSub==7 ){ + rc = sqlite3session_patchset(pSession, &o.n, &o.p); + }else{ + rc = sqlite3session_changeset(pSession, &o.n, &o.p); + } } if( rc==SQLITE_OK ){ - Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(pChange, nChange)); - sqlite3_free(pChange); - }else{ + Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(o.p, o.n)); + } + sqlite3_free(o.p); + if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } break; diff --git a/manifest b/manifest index ffe55892fc..753640048a 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Merge\sall\srecent\strunk\schanges\sinto\sthe\ssessions\sbranch. -D 2014-09-21T22:49:20.257 +C Begin\sadding\s'streaming'\sAPIs\sto\ssessions\smodule.\sThis\sis\sa\swork\sin\sprogress. +D 2014-09-23T20:39:55.903 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in dd5f245aa8c741bc65845747203c8ce2f3fb6c83 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -158,9 +158,9 @@ F ext/session/sessionA.test eb05c13e4ef1ca8046a3a6dbf2d5f6f5b04a11d4 F ext/session/sessionB.test 276267cd7fc37c2e2dd03f1e2ed9ada336a8bdb4 F ext/session/session_common.tcl 1539d8973b2aea0025c133eb0cc4c89fcef541a5 F ext/session/sessionfault.test e7965159a73d385c1a4af12d82c3a039ebdd71a6 -F ext/session/sqlite3session.c 4c7689bd8286147f7d9bf5d4b6ca5e7e7ee588ab -F ext/session/sqlite3session.h 66c14a2f6193c47773770307636e88c43db6f839 -F ext/session/test_session.c a252fb669d3a1b3552ee7b87fe610debc0afeb7b +F ext/session/sqlite3session.c ead909b1b0976aa6d08dcb7b487a902e358f7e4c +F ext/session/sqlite3session.h d074a929d368b438d32c15af8f8fe2afa80afe3f +F ext/session/test_session.c e39119c8554fe1b0925a038423ca137ddf6f6bd9 F ext/userauth/sqlite3userauth.h 19cb6f0e31316d0ee4afdfb7a85ef9da3333a220 F ext/userauth/user-auth.txt e6641021a9210364665fe625d067617d03f27b04 F ext/userauth/userauth.c 5fa3bdb492f481bbc1709fc83c91ebd13460c69e @@ -776,7 +776,7 @@ F test/pagesize.test 1dd51367e752e742f58e861e65ed7390603827a0 F test/pcache.test b09104b03160aca0d968d99e8cd2c5b1921a993d F test/pcache2.test a83efe2dec0d392f814bfc998def1d1833942025 F test/percentile.test b98fc868d71eb5619d42a1702e9ab91718cbed54 -F test/permutations.test 89f594fdba922586d46c3e0a7ab4990b5a7f8da7 +F test/permutations.test b8ca6c9ecec6f360485a8cb61ef1b8734b31797b F test/pragma.test 19d0241a007bcdd77fc2606ec60fc60357e7fc8b F test/pragma2.test aea7b3d82c76034a2df2b38a13745172ddc0bc13 F test/printf.test ec9870c4dce8686a37818e0bf1aba6e6a1863552 @@ -1216,7 +1216,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f -P c2885c6bb24cc55178467e57e77bf71df58b3b13 d5880abd63c83c88e135257373afa0a3fd88297e -R 9d7f30c83131a55806d3c1dd053387df -U drh -Z c53e54ee2e290caf615c330d235d140a +P 6406b77f2c447751a2fbb16f01c61cdcfd6af59e +R 67d8884fcab24bc7ffc4823662b37f4a +U dan +Z 2bbe541c87040990ceab83529c1e7e80 diff --git a/manifest.uuid b/manifest.uuid index a34006bb05..424c64b57a 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -6406b77f2c447751a2fbb16f01c61cdcfd6af59e \ No newline at end of file +3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5 \ No newline at end of file diff --git a/test/permutations.test b/test/permutations.test index 7bea39eb7a..44f571793c 100644 --- a/test/permutations.test +++ b/test/permutations.test @@ -938,6 +938,14 @@ test_suite "session_eec" -description { sqlite3_extended_result_codes $::dbhandle 1 } +test_suite "session_str" -description { + All session module related tests using the streaming APIs. +} -files [ + glob -nocomplain $::testdir/../ext/session/*.test +] -dbconfig { + set ::sqlite3session_streams 1 +} + test_suite "no_optimization" -description { Run test scripts with optimizations disabled using the sqlite3_test_control(SQLITE_TESTCTRL_OPTIMIZATIONS) interface.