diff --git a/ext/session/sqlite3session.c b/ext/session/sqlite3session.c index ec0387e775..ceea8268c6 100644 --- a/ext/session/sqlite3session.c +++ b/ext/session/sqlite3session.c @@ -17,7 +17,11 @@ typedef struct SessionInput SessionInput; /* ** Minimum chunk size used by streaming versions of functions. */ +#ifdef SQLITE_TEST +#define SESSIONS_STR_CHUNK_SIZE 1 +#else #define SESSIONS_STR_CHUNK_SIZE 1024 +#endif /* ** Session handle structure. @@ -51,9 +55,10 @@ struct SessionBuffer { ** a stream function (sqlite3changeset_start_str()). */ struct SessionInput { - int iNext; /* Offset in aChangeset[] of next change */ - u8 *aChangeset; /* Pointer to buffer containing changeset */ - int nChangeset; /* Number of bytes in aChangeset */ + int iNext; /* Offset in aData[] of next change */ + u8 *aData; /* Pointer to buffer containing changeset */ + int nData; /* Number of bytes in aData */ + SessionBuffer buf; /* Current read buffer */ int (*xInput)(void*, void*, int*); /* Input stream call (or NULL) */ void *pIn; /* First argument to xInput */ @@ -2033,8 +2038,8 @@ int sessionChangesetStart( pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte); if( !pRet ) return SQLITE_NOMEM; memset(pRet, 0, sizeof(sqlite3_changeset_iter)); - pRet->in.aChangeset = (u8 *)pChangeset; - pRet->in.nChangeset = nChangeset; + pRet->in.aData = (u8 *)pChangeset; + pRet->in.nData = nChangeset; pRet->in.xInput = xInput; pRet->in.pIn = pIn; pRet->in.iNext = 0; @@ -2074,10 +2079,31 @@ int sqlite3changeset_start_str( ** ** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise. */ -static int sessionInputBuffer(SessionInput *pInput, int nByte){ +static int sessionInputBuffer(SessionInput *pIn, int nByte){ int rc = SQLITE_OK; - if( pInput->xInput && !pInput->bEof ){ - assert( 0 ); + if( pIn->xInput ){ + while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){ + int nNew = SESSIONS_STR_CHUNK_SIZE; + + if( pIn->iNext>=SESSIONS_STR_CHUNK_SIZE ){ + int nMove = pIn->buf.nBuf - pIn->iNext; + memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove); + pIn->buf.nBuf -= pIn->iNext; + pIn->iNext = 0; + } + + if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){ + rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew); + if( nNew==0 ){ + pIn->bEof = 1; + }else{ + pIn->buf.nBuf += nNew; + } + } + + pIn->aData = pIn->buf.aBuf; + pIn->nData = pIn->buf.nBuf; + } } return rc; } @@ -2107,6 +2133,25 @@ static void sessionSkipRecord( *ppRec = aRec; } +/* +** This function sets the value of the sqlite3_value object passed as the +** first argument to a copy of the string or blob held in the aData[] +** buffer. SQLITE_OK is returned if successful, or SQLITE_NOMEM if an OOM +** error occurs. +*/ +static int sessionValueSetStr( + sqlite3_value *pVal, /* Set the value of this object */ + u8 *aData, /* Buffer containing string or blob data */ + int nData, /* Size of buffer aData[] in bytes */ + u8 enc /* String encoding (0 for blobs) */ +){ + u8 *aCopy = sqlite3_malloc(nData); + if( aCopy==0 ) return SQLITE_NOMEM; + memcpy(aCopy, aData, nData); + sqlite3ValueSetStr(pVal, nData, (char*)aCopy, enc, sqlite3_free); + return SQLITE_OK; +} + /* ** Deserialize a single record from a buffer in memory. See "RECORD FORMAT" ** for details. @@ -2145,7 +2190,7 @@ static int sessionReadRecord( if( abPK && abPK[i]==0 ) continue; rc = sessionInputBuffer(pIn, 9); if( rc==SQLITE_OK ){ - eType = pIn->aChangeset[pIn->iNext++]; + eType = pIn->aData[pIn->iNext++]; } assert( !apOut || apOut[i]==0 ); @@ -2157,15 +2202,14 @@ static int sessionReadRecord( } if( rc==SQLITE_OK ){ - u8 *aVal = &pIn->aChangeset[pIn->iNext]; + u8 *aVal = &pIn->aData[pIn->iNext]; if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){ int nByte; pIn->iNext += sessionVarintGet(aVal, &nByte); rc = sessionInputBuffer(pIn, nByte); if( apOut && rc==SQLITE_OK ){ - u8 *aRec = &pIn->aChangeset[pIn->iNext]; u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0); - sqlite3ValueSetStr(apOut[i], nByte, (char *)aRec, enc, SQLITE_STATIC); + rc = sessionValueSetStr(apOut[i],&pIn->aData[pIn->iNext],nByte,enc); } pIn->iNext += nByte; } @@ -2204,20 +2248,23 @@ static int sessionReadRecord( static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){ int rc = SQLITE_OK; int nCol = 0; - int iIn = pIn->iNext; + int nRead = 0; rc = sessionInputBuffer(pIn, 9); if( rc==SQLITE_OK ){ - iIn += sessionVarintGet(&pIn->aChangeset[iIn], &nCol); - rc = sessionInputBuffer(pIn, nCol+100); - iIn += nCol; + nRead += sessionVarintGet(&pIn->aData[pIn->iNext + nRead], &nCol); + rc = sessionInputBuffer(pIn, nRead+nCol+100); + nRead += nCol; } + while( rc==SQLITE_OK ){ - while( iInnChangeset && pIn->aChangeset[iIn] ) iIn++; - if( pIn->aChangeset[iIn]==0 ) break; - rc = sessionInputBuffer(pIn, 100); + while( (pIn->iNext + nRead)nData && pIn->aData[pIn->iNext + nRead] ){ + nRead++; + } + if( pIn->aData[pIn->iNext + nRead]==0 ) break; + rc = sessionInputBuffer(pIn, nRead + 100); } - if( pnByte ) *pnByte = (iIn+1 - pIn->iNext); + if( pnByte ) *pnByte = nRead+1; return rc; } @@ -2238,7 +2285,7 @@ static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){ if( rc==SQLITE_OK ){ int nByte; int nVarint; - nVarint = sessionVarintGet(&p->in.aChangeset[p->in.iNext], &p->nCol); + nVarint = sessionVarintGet(&p->in.aData[p->in.iNext], &p->nCol); nCopy -= nVarint; p->in.iNext += nVarint; nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy; @@ -2249,7 +2296,7 @@ static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){ if( rc==SQLITE_OK ){ int iPK = sizeof(sqlite3_value*)*p->nCol*2; memset(p->tblhdr.aBuf, 0, iPK); - memcpy(&p->tblhdr.aBuf[iPK], &p->in.aChangeset[p->in.iNext], nCopy); + memcpy(&p->tblhdr.aBuf[iPK], &p->in.aData[p->in.iNext], nCopy); p->in.iNext += nCopy; } @@ -2305,25 +2352,25 @@ static int sessionChangesetNext( if( p->rc!=SQLITE_OK ) return p->rc; /* If the iterator is already at the end of the changeset, return DONE. */ - if( p->in.iNext>=p->in.nChangeset ){ + if( p->in.iNext>=p->in.nData ){ return SQLITE_DONE; } - op = p->in.aChangeset[p->in.iNext++]; + op = p->in.aData[p->in.iNext++]; if( op=='T' || op=='P' ){ p->bPatchset = (op=='P'); if( sessionChangesetReadTblhdr(p) ) return p->rc; if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc; - op = p->in.aChangeset[p->in.iNext++]; + op = p->in.aData[p->in.iNext++]; } p->op = op; - p->bIndirect = p->in.aChangeset[p->in.iNext++]; + p->bIndirect = p->in.aData[p->in.iNext++]; if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ - return (p->rc = SQLITE_CORRUPT); + return (p->rc = SQLITE_CORRUPT_BKPT); } - if( paRec ){ *paRec = &p->in.aChangeset[p->in.iNext]; } + if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; } /* If this is an UPDATE or DELETE, read the old.* record. */ if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){ @@ -2340,7 +2387,7 @@ static int sessionChangesetNext( } if( pnRec ){ - *pnRec = (int)(&p->in.aChangeset[p->in.iNext] - *paRec); + *pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec); }else if( p->bPatchset && p->op==SQLITE_UPDATE ){ /* If this is an UPDATE that is part of a patchset, then all PK and ** modified fields are present in the new.* record. The old.* record @@ -2530,6 +2577,7 @@ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ for(i=0; inCol*2; i++) sqlite3ValueFree(p->apValue[i]); } sqlite3_free(p->tblhdr.aBuf); + sqlite3_free(p->in.buf.aBuf); sqlite3_free(p); return rc; } @@ -2560,8 +2608,8 @@ int sqlite3changeset_invert( /* Set up the input stream */ memset(&sInput, 0, sizeof(SessionInput)); - sInput.nChangeset = nChangeset; - sInput.aChangeset = (u8*)pChangeset; + sInput.nData = nChangeset; + sInput.aData = (u8*)pChangeset; aOut = (u8 *)sqlite3_malloc(nChangeset); if( !aOut ) return SQLITE_NOMEM; @@ -2571,7 +2619,7 @@ int sqlite3changeset_invert( while( inData - p->iData; /* Bytes of data available */ + int nRet = p->nStream; /* Bytes actually returned */ + + if( nRet>nReq ) nRet = nReq; + if( nRet>nRem ) nRet = nRem; + + assert( nRet>=0 ); + if( nRet>0 ){ + memcpy(pData, &p->aData[p->iData], nRet); + p->iData += nRet; + } + + *pnData = nRet; + return SQLITE_OK; +} + + /* ** sqlite3changeset_apply DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT? */ @@ -559,6 +591,10 @@ static int test_sqlite3changeset_apply( void *pChangeset; /* Buffer containing changeset */ int nChangeset; /* Size of buffer aChangeset in bytes */ TestConflictHandler ctx; + TestStreamInput sStr; + + memset(&sStr, 0, sizeof(sStr)); + sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR); if( objc!=4 && objc!=5 ){ Tcl_WrongNumArgs(interp, 1, objv, @@ -576,9 +612,18 @@ static int test_sqlite3changeset_apply( ctx.pFilterScript = objc==5 ? objv[4] : 0; ctx.interp = interp; - rc = sqlite3changeset_apply(db, nChangeset, pChangeset, - (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx - ); + if( sStr.nStream==0 ){ + rc = sqlite3changeset_apply(db, nChangeset, pChangeset, + (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx + ); + }else{ + sStr.aData = (unsigned char*)pChangeset; + sStr.nData = nChangeset; + rc = sqlite3changeset_apply_str(db, testStreamInput, (void*)&sStr, + (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx + ); + } + if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } @@ -632,7 +677,7 @@ static int test_sqlite3changeset_invert( ){ int rc; /* Return code from changeset_invert() */ void *aChangeset; /* Input changeset */ - int nChangeSet; /* Size of buffer aChangeset in bytes */ + int nChangeset; /* Size of buffer aChangeset in bytes */ void *aOut; /* Output changeset */ int nOut; /* Size of buffer aOut in bytes */ @@ -640,9 +685,9 @@ static int test_sqlite3changeset_invert( Tcl_WrongNumArgs(interp, 1, objv, "CHANGESET"); return TCL_ERROR; } - aChangeset = (void *)Tcl_GetByteArrayFromObj(objv[1], &nChangeSet); + aChangeset = (void *)Tcl_GetByteArrayFromObj(objv[1], &nChangeset); - rc = sqlite3changeset_invert(nChangeSet, aChangeset, &nOut, &aOut); + rc = sqlite3changeset_invert(nChangeset, aChangeset, &nOut, &aOut); if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } @@ -693,8 +738,8 @@ static int test_sqlite3session_foreach( int objc, Tcl_Obj *CONST objv[] ){ - void *pChangeSet; - int nChangeSet; + void *pChangeset; + int nChangeset; sqlite3_changeset_iter *pIter; int rc; Tcl_Obj *pVarname; @@ -702,6 +747,9 @@ static int test_sqlite3session_foreach( Tcl_Obj *pScript; int isCheckNext = 0; + TestStreamInput sStr; + memset(&sStr, 0, sizeof(sStr)); + if( objc>1 ){ char *zOpt = Tcl_GetString(objv[1]); isCheckNext = (strcmp(zOpt, "-next")==0); @@ -715,8 +763,15 @@ static int test_sqlite3session_foreach( pCS = objv[2+isCheckNext]; pScript = objv[3+isCheckNext]; - pChangeSet = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeSet); - rc = sqlite3changeset_start(&pIter, nChangeSet, pChangeSet); + pChangeset = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeset); + sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR); + if( sStr.nStream==0 ){ + rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset); + }else{ + sStr.aData = (unsigned char*)pChangeset; + sStr.nData = nChangeset; + rc = sqlite3changeset_start_str(&pIter, testStreamInput, (void*)&sStr); + } if( rc!=SQLITE_OK ){ return test_session_error(interp, rc); } diff --git a/manifest b/manifest index 753640048a..e7faacba4f 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Begin\sadding\s'streaming'\sAPIs\sto\ssessions\smodule.\sThis\sis\sa\swork\sin\sprogress. -D 2014-09-23T20:39:55.903 +C Add\sstreaming\sversion\sof\ssqlite3changeset_apply().\sTests\sand\sfixes\sfor\sthe\ssame\sand\ssqlite3changeset_start_str(). +D 2014-09-24T17:13:20.331 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in dd5f245aa8c741bc65845747203c8ce2f3fb6c83 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -158,9 +158,9 @@ F ext/session/sessionA.test eb05c13e4ef1ca8046a3a6dbf2d5f6f5b04a11d4 F ext/session/sessionB.test 276267cd7fc37c2e2dd03f1e2ed9ada336a8bdb4 F ext/session/session_common.tcl 1539d8973b2aea0025c133eb0cc4c89fcef541a5 F ext/session/sessionfault.test e7965159a73d385c1a4af12d82c3a039ebdd71a6 -F ext/session/sqlite3session.c ead909b1b0976aa6d08dcb7b487a902e358f7e4c -F ext/session/sqlite3session.h d074a929d368b438d32c15af8f8fe2afa80afe3f -F ext/session/test_session.c e39119c8554fe1b0925a038423ca137ddf6f6bd9 +F ext/session/sqlite3session.c 1c653844900de41e175f77f22fe1af7abb05e798 +F ext/session/sqlite3session.h 7e7a31ad1992f6678a20654c9751dacd10384292 +F ext/session/test_session.c 77f1e7a269daeb60f82441ff859c812d686ef79d F ext/userauth/sqlite3userauth.h 19cb6f0e31316d0ee4afdfb7a85ef9da3333a220 F ext/userauth/user-auth.txt e6641021a9210364665fe625d067617d03f27b04 F ext/userauth/userauth.c 5fa3bdb492f481bbc1709fc83c91ebd13460c69e @@ -832,7 +832,7 @@ F test/selectD.test b0f02a04ef7737decb24e08be2c39b9664b43394 F test/selectE.test fc02a1eb04c8eb537091482644b7d778ae8759b7 F test/selectF.test 21c94e6438f76537b72532fa9fd4710cdd455fc3 F test/server1.test 46803bd3fe8b99b30dbc5ff38ffc756f5c13a118 -F test/session.test 082dea459efc76e2a527b8ee9ff74d76e63ea7b6 +F test/session.test 35f9c76809c26bee45d86891c7620b692ef9b8a8 F test/shared.test 1da9dbad400cee0d93f252ccf76e1ae007a63746 F test/shared2.test 03eb4a8d372e290107d34b6ce1809919a698e879 F test/shared3.test fcd65cb11d189eff5f5c85cc4fad246fb0933108 @@ -1216,7 +1216,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f -P 6406b77f2c447751a2fbb16f01c61cdcfd6af59e -R 67d8884fcab24bc7ffc4823662b37f4a +P 3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5 +R bba1026624291c9ce371985c2bc9cae0 U dan -Z 2bbe541c87040990ceab83529c1e7e80 +Z 40a11f75418a19b3b02aba54325bf64c diff --git a/manifest.uuid b/manifest.uuid index 424c64b57a..4b63e2e10a 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5 \ No newline at end of file +b917fc146876f764442de08d5ec36e5b4cf5ab52 \ No newline at end of file diff --git a/test/session.test b/test/session.test index 85ac056cdd..da04ac45c5 100644 --- a/test/session.test +++ b/test/session.test @@ -16,6 +16,7 @@ ifcapable session { # again with it clear. run_test_suite session_eec run_test_suite session + run_test_suite session_str } finish_test