mirror of
https://github.com/sqlite/sqlite.git
synced 2025-08-08 14:02:16 +03:00
Add multi-threaded performance test program "tserver" to this branch. Fix bugs
in the begin-concurrent/wal2 integration revealed by the same. FossilOrigin-Name: 7bd3b35661d7d0e51113b9e4b15a0ab7f8e26edeafb43941ef5b44bb94df5109
This commit is contained in:
14
manifest
14
manifest
@@ -1,5 +1,5 @@
|
||||
C Add\stest\scases\sto\swal2concurrent.test.
|
||||
D 2018-12-05T18:25:23.042
|
||||
C Add\smulti-threaded\sperformance\stest\sprogram\s"tserver"\sto\sthis\sbranch.\sFix\sbugs\nin\sthe\sbegin-concurrent/wal2\sintegration\srevealed\sby\sthe\ssame.
|
||||
D 2018-12-07T20:25:14.317
|
||||
F .fossil-settings/empty-dirs dbb81e8fc0401ac46a1491ab34a7f2c7c0452f2f06b54ebb845d024ca8283ef1
|
||||
F .fossil-settings/ignore-glob 35175cdfcf539b2318cb04a9901442804be81cd677d8b889fcc9149c21f239ea
|
||||
F Makefile.in a050c8670ea0d7b37b2192306cbb50d392acd9902b84e9b56f3444d006f97a6c
|
||||
@@ -593,7 +593,7 @@ F src/vdbesort.c 90aad5a92608f2dd771c96749beabdb562c9d881131a860a7a5bccf66dc3be7
|
||||
F src/vdbetrace.c 79d6dbbc479267b255a7de8080eee6e729928a0ef93ed9b0bfa5618875b48392
|
||||
F src/vtab.c 70188a745dc4e57d26e942681ff4b2912b7c8249ad5de3f60f0677b4337bcfaa
|
||||
F src/vxworks.h d2988f4e5a61a4dfe82c6524dd3d6e4f2ce3cdb9
|
||||
F src/wal.c 43a7e48cea522cbb8eaf198b5f47ec4aea3bf89ae41c65a5fa56d77c6b2125e0
|
||||
F src/wal.c 1a564ef45c6443c52d1c576d0b7b9fb969a91457ed2cf286d98d900e88a5f73f
|
||||
F src/wal.h c398e0269e8f37495cedb63b5e288c2aac6f6d103d05fb55f4affec21311615d
|
||||
F src/walker.c fb94aadc9099ff9c6506d0a8b88d51266005bcaa265403f3d7caf732a562eb66
|
||||
F src/where.c 3818e8a736a05d2cb194e64399af707e367fbcc5c251d785804d02eaf121288e
|
||||
@@ -1769,6 +1769,8 @@ F tool/srcck1.c 371de5363b70154012955544f86fdee8f6e5326f
|
||||
F tool/stack_usage.tcl f8e71b92cdb099a147dad572375595eae55eca43
|
||||
F tool/symbols-mingw.sh 4dbcea7e74768305384c9fd2ed2b41bbf9f0414d
|
||||
F tool/symbols.sh c5a617b8c61a0926747a56c65f5671ef8ac0e148
|
||||
F tool/tserver.c 46ef565f79b326a86b696ae255548cf841fafae10dda7b6a2027b323f4ee6dac
|
||||
F tool/tserver_test.tcl 777e1e4f26c1bf47fecdb61ce46e1db36c209a81956ccfd9e2d957377b54974f
|
||||
F tool/varint.c 5d94cb5003db9dbbcbcc5df08d66f16071aee003
|
||||
F tool/vdbe-compress.tcl 5926c71f9c12d2ab73ef35c29376e756eb68361c
|
||||
F tool/vdbe_profile.tcl 246d0da094856d72d2c12efec03250d71639d19f
|
||||
@@ -1796,7 +1798,7 @@ F vsixtest/vsixtest.tcl 6a9a6ab600c25a91a7acc6293828957a386a8a93
|
||||
F vsixtest/vsixtest.vcxproj.data 2ed517e100c66dc455b492e1a33350c1b20fbcdc
|
||||
F vsixtest/vsixtest.vcxproj.filters 37e51ffedcdb064aad6ff33b6148725226cd608e
|
||||
F vsixtest/vsixtest_TemporaryKey.pfx e5b1b036facdb453873e7084e1cae9102ccc67a0
|
||||
P 692ddc280850ea2ff5d2fa3743f85fb123b4e4ebc452aae9c58b8d80d22b75b1
|
||||
R 70d5ec81123343dc4b5125cf054cd873
|
||||
P ef9e7a87a4ce4fe9596aed2f880f2b9fc3526c00c48f2868c625a4c24be65abe
|
||||
R d4106d386120e66ef9e2fbd9ae6bf9bf
|
||||
U dan
|
||||
Z e68147c0d113bb85826459be77b6127e
|
||||
Z 4d7f96aaf6dd169d1c2711123641179c
|
||||
|
@@ -1 +1 @@
|
||||
ef9e7a87a4ce4fe9596aed2f880f2b9fc3526c00c48f2868c625a4c24be65abe
|
||||
7bd3b35661d7d0e51113b9e4b15a0ab7f8e26edeafb43941ef5b44bb94df5109
|
16
src/wal.c
16
src/wal.c
@@ -2435,7 +2435,10 @@ static int walCheckpoint(
|
||||
}
|
||||
|
||||
/* Release the reader lock held while backfilling */
|
||||
walUnlockExclusive(pWal, WAL_READ_LOCK(bWal2 ? 1 + iCkpt*2 : 0), 1);
|
||||
if( bWal2 ){
|
||||
walUnlockExclusive(pWal, WAL_READ_LOCK(1 + iCkpt*2), 1);
|
||||
}
|
||||
walUnlockExclusive(pWal, WAL_READ_LOCK(0), 1);
|
||||
}
|
||||
|
||||
if( rc==SQLITE_BUSY ){
|
||||
@@ -3861,11 +3864,16 @@ int sqlite3WalLockForCommit(
|
||||
u8 aNew[4];
|
||||
u8 *aOld = &((u8*)pPage1->pData)[40];
|
||||
int sz;
|
||||
i64 iOffset;
|
||||
i64 iOff;
|
||||
int iFrame = sLoc.iZero + i;
|
||||
int iWal = 0;
|
||||
if( bWal2 ){
|
||||
iWal = walExternalDecode(iFrame, &iFrame);
|
||||
}
|
||||
sz = pWal->hdr.szPage;
|
||||
sz = (sz&0xfe00) + ((sz&0x0001)<<16);
|
||||
iOffset = walFrameOffset(i+sLoc.iZero, sz)+WAL_FRAME_HDRSIZE+40;
|
||||
rc = sqlite3OsRead(pWal->apWalFd[0], aNew,sizeof(aNew),iOffset);
|
||||
iOff = walFrameOffset(iFrame, sz) + WAL_FRAME_HDRSIZE + 40;
|
||||
rc = sqlite3OsRead(pWal->apWalFd[iWal],aNew,sizeof(aNew),iOff);
|
||||
if( rc==SQLITE_OK && memcmp(aOld, aNew, sizeof(aNew)) ){
|
||||
rc = SQLITE_BUSY_SNAPSHOT;
|
||||
}
|
||||
|
614
tool/tserver.c
Normal file
614
tool/tserver.c
Normal file
@@ -0,0 +1,614 @@
|
||||
/*
|
||||
** 2017 June 7
|
||||
**
|
||||
** The author disclaims copyright to this source code. In place of
|
||||
** a legal notice, here is a blessing:
|
||||
**
|
||||
** May you do good and not evil.
|
||||
** May you find forgiveness for yourself and forgive others.
|
||||
** May you share freely, never taking more than you give.
|
||||
**
|
||||
*************************************************************************
|
||||
**
|
||||
** Simple multi-threaded server used for informal testing of concurrency
|
||||
** between connections in different threads. Listens for tcp/ip connections
|
||||
** on port 9999 of the 127.0.0.1 interface only. To build:
|
||||
**
|
||||
** gcc -g $(TOP)/tool/tserver.c sqlite3.o -lpthread -o tserver
|
||||
**
|
||||
** To run using "x.db" as the db file:
|
||||
**
|
||||
** ./tserver x.db
|
||||
**
|
||||
** To connect, open a client socket on port 9999 and start sending commands.
|
||||
** Commands are either SQL - which must be terminated by a semi-colon, or
|
||||
** dot-commands, which must be terminated by a newline. If an SQL statement
|
||||
** is seen, it is prepared and added to an internal list.
|
||||
**
|
||||
** Dot-commands are:
|
||||
**
|
||||
** .list Display all SQL statements in the list.
|
||||
** .quit Disconnect.
|
||||
** .run Run all SQL statements in the list.
|
||||
** .repeats N Configure the number of repeats per ".run".
|
||||
** .seconds N Configure the number of seconds to ".run" for.
|
||||
** .mutex_commit Add a "COMMIT" protected by a g.commit_mutex
|
||||
** to the current SQL.
|
||||
** .stop Stop the tserver process - exit(0).
|
||||
**
|
||||
** Example input:
|
||||
**
|
||||
** BEGIN;
|
||||
** INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
|
||||
** INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
|
||||
** INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
|
||||
** COMMIT;
|
||||
** .repeats 100000
|
||||
** .run
|
||||
**
|
||||
*/
|
||||
#define TSERVER_PORTNUMBER 9999
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <signal.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "sqlite3.h"
|
||||
|
||||
#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900
|
||||
|
||||
/* Global variables */
|
||||
struct TserverGlobal {
|
||||
char *zDatabaseName; /* Database used by this server */
|
||||
char *zVfs;
|
||||
sqlite3_mutex *commit_mutex;
|
||||
sqlite3 *db; /* Global db handle */
|
||||
|
||||
/* The following use native pthreads instead of a portable interface. This
|
||||
** is because a condition variable, as well as a mutex, is required. */
|
||||
pthread_mutex_t ckpt_mutex;
|
||||
pthread_cond_t ckpt_cond;
|
||||
int nThreshold; /* Checkpoint when wal is this large */
|
||||
int bCkptRequired; /* True if wal checkpoint is required */
|
||||
int nRun; /* Number of clients in ".run" */
|
||||
int nWait; /* Number of clients waiting on ckpt_cond */
|
||||
};
|
||||
|
||||
static struct TserverGlobal g = {0};
|
||||
|
||||
typedef struct ClientSql ClientSql;
|
||||
struct ClientSql {
|
||||
sqlite3_stmt *pStmt;
|
||||
int bMutex;
|
||||
};
|
||||
|
||||
typedef struct ClientCtx ClientCtx;
|
||||
struct ClientCtx {
|
||||
sqlite3 *db; /* Database handle for this client */
|
||||
int fd; /* Client fd */
|
||||
int nRepeat; /* Number of times to repeat SQL */
|
||||
int nSecond; /* Number of seconds to run for */
|
||||
ClientSql *aPrepare; /* Array of prepared statements */
|
||||
int nPrepare; /* Valid size of apPrepare[] */
|
||||
int nAlloc; /* Allocated size of apPrepare[] */
|
||||
|
||||
int nClientThreshold; /* Threshold for checkpointing */
|
||||
int bClientCkptRequired; /* True to do a checkpoint */
|
||||
};
|
||||
|
||||
static int is_eol(int i){
|
||||
return (i=='\n' || i=='\r');
|
||||
}
|
||||
static int is_whitespace(int i){
|
||||
return (i==' ' || i=='\t' || is_eol(i));
|
||||
}
|
||||
|
||||
/*
|
||||
** Implementation of SQL scalar function usleep().
|
||||
*/
|
||||
static void usleepFunc(
|
||||
sqlite3_context *context,
|
||||
int argc,
|
||||
sqlite3_value **argv
|
||||
){
|
||||
int nUs;
|
||||
sqlite3_vfs *pVfs = (sqlite3_vfs*)sqlite3_user_data(context);
|
||||
assert( argc==1 );
|
||||
nUs = sqlite3_value_int64(argv[0]);
|
||||
pVfs->xSleep(pVfs, nUs);
|
||||
}
|
||||
|
||||
static void trim_string(const char **pzStr, int *pnStr){
|
||||
const char *zStr = *pzStr;
|
||||
int nStr = *pnStr;
|
||||
|
||||
while( nStr>0 && is_whitespace(zStr[0]) ){
|
||||
zStr++;
|
||||
nStr--;
|
||||
}
|
||||
while( nStr>0 && is_whitespace(zStr[nStr-1]) ){
|
||||
nStr--;
|
||||
}
|
||||
|
||||
*pzStr = zStr;
|
||||
*pnStr = nStr;
|
||||
}
|
||||
|
||||
static int send_message(ClientCtx *p, const char *zFmt, ...){
|
||||
char *zMsg;
|
||||
va_list ap; /* Vararg list */
|
||||
va_start(ap, zFmt);
|
||||
int res = -1;
|
||||
|
||||
zMsg = sqlite3_vmprintf(zFmt, ap);
|
||||
if( zMsg ){
|
||||
res = write(p->fd, zMsg, strlen(zMsg));
|
||||
}
|
||||
sqlite3_free(zMsg);
|
||||
va_end(ap);
|
||||
|
||||
return (res<0);
|
||||
}
|
||||
|
||||
static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){
|
||||
const char *zTail = zSql;
|
||||
int nTail = nSql;
|
||||
int rc = SQLITE_OK;
|
||||
|
||||
while( rc==SQLITE_OK ){
|
||||
if( p->nPrepare>=p->nAlloc ){
|
||||
int nByte = (p->nPrepare+32) * sizeof(ClientSql);
|
||||
ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte);
|
||||
if( aNew ){
|
||||
memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32);
|
||||
p->aPrepare = aNew;
|
||||
p->nAlloc = p->nPrepare+32;
|
||||
}else{
|
||||
rc = SQLITE_NOMEM;
|
||||
break;
|
||||
}
|
||||
}
|
||||
rc = sqlite3_prepare_v2(
|
||||
p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail
|
||||
);
|
||||
if( rc!=SQLITE_OK ){
|
||||
send_message(p, "error - %s (eec=%d)\n", sqlite3_errmsg(p->db),
|
||||
sqlite3_extended_errcode(p->db)
|
||||
);
|
||||
rc = 1;
|
||||
break;
|
||||
}
|
||||
if( p->aPrepare[p->nPrepare].pStmt==0 ){
|
||||
break;
|
||||
}
|
||||
p->nPrepare++;
|
||||
nTail = nSql - (zTail-zSql);
|
||||
rc = send_message(p, "ok (%d SQL statements)\n", p->nPrepare);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static sqlite3_int64 get_timer(void){
|
||||
struct timeval t;
|
||||
gettimeofday(&t, 0);
|
||||
return ((sqlite3_int64)t.tv_usec / 1000) + ((sqlite3_int64)t.tv_sec * 1000);
|
||||
}
|
||||
|
||||
static void clear_sql(ClientCtx *p){
|
||||
int j;
|
||||
for(j=0; j<p->nPrepare; j++){
|
||||
sqlite3_finalize(p->aPrepare[j].pStmt);
|
||||
}
|
||||
p->nPrepare = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
** The sqlite3_wal_hook() callback used by all client database connections.
|
||||
*/
|
||||
static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){
|
||||
if( g.nThreshold>0 ){
|
||||
if( nFrame>=g.nThreshold ){
|
||||
g.bCkptRequired = 1;
|
||||
}
|
||||
}else{
|
||||
ClientCtx *pCtx = (ClientCtx*)pArg;
|
||||
if( pCtx->nClientThreshold && nFrame>=pCtx->nClientThreshold ){
|
||||
pCtx->bClientCkptRequired = 1;
|
||||
}
|
||||
}
|
||||
return SQLITE_OK;
|
||||
}
|
||||
|
||||
static int handle_run_command(ClientCtx *p){
|
||||
int i, j;
|
||||
int nBusy = 0;
|
||||
sqlite3_int64 t0 = get_timer();
|
||||
sqlite3_int64 t1 = t0;
|
||||
int nT1 = 0;
|
||||
int nTBusy1 = 0;
|
||||
int rc = SQLITE_OK;
|
||||
|
||||
pthread_mutex_lock(&g.ckpt_mutex);
|
||||
g.nRun++;
|
||||
pthread_mutex_unlock(&g.ckpt_mutex);
|
||||
|
||||
|
||||
for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
|
||||
sqlite3_int64 t2;
|
||||
|
||||
for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
|
||||
sqlite3_stmt *pStmt = p->aPrepare[i].pStmt;
|
||||
|
||||
/* If the bMutex flag is set, grab g.commit_mutex before executing
|
||||
** the SQL statement (which is always "COMMIT" in this case). */
|
||||
if( p->aPrepare[i].bMutex ){
|
||||
sqlite3_mutex_enter(g.commit_mutex);
|
||||
}
|
||||
|
||||
/* Execute the statement */
|
||||
while( sqlite3_step(pStmt)==SQLITE_ROW );
|
||||
rc = sqlite3_reset(pStmt);
|
||||
|
||||
/* Relinquish the g.commit_mutex mutex if required. */
|
||||
if( p->aPrepare[i].bMutex ){
|
||||
sqlite3_mutex_leave(g.commit_mutex);
|
||||
}
|
||||
|
||||
if( (rc & 0xFF)==SQLITE_BUSY ){
|
||||
if( sqlite3_get_autocommit(p->db)==0 ){
|
||||
sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
|
||||
}
|
||||
nBusy++;
|
||||
rc = SQLITE_OK;
|
||||
break;
|
||||
}
|
||||
else if( rc!=SQLITE_OK ){
|
||||
send_message(p, "error - %s (eec=%d)\n", sqlite3_errmsg(p->db),
|
||||
sqlite3_extended_errcode(p->db)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
t2 = get_timer();
|
||||
if( t2>=(t1+1000) ){
|
||||
int nMs = (t2 - t1);
|
||||
int nDone = (j+1 - nBusy - nT1);
|
||||
|
||||
rc = send_message(
|
||||
p, "(%d done @ %d per second, %d busy)\n",
|
||||
nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
|
||||
);
|
||||
t1 = t2;
|
||||
nT1 = j+1 - nBusy;
|
||||
nTBusy1 = nBusy;
|
||||
if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
|
||||
}
|
||||
|
||||
/* Global checkpoint handling. */
|
||||
if( g.nThreshold>0 ){
|
||||
pthread_mutex_lock(&g.ckpt_mutex);
|
||||
if( rc==SQLITE_OK && g.bCkptRequired ){
|
||||
if( g.nWait==g.nRun-1 ){
|
||||
/* All other clients are already waiting on the condition variable.
|
||||
** Run the checkpoint, signal the condition and move on. */
|
||||
rc = sqlite3_wal_checkpoint(p->db, "main");
|
||||
g.bCkptRequired = 0;
|
||||
pthread_cond_broadcast(&g.ckpt_cond);
|
||||
}else{
|
||||
assert( g.nWait<g.nRun-1 );
|
||||
g.nWait++;
|
||||
pthread_cond_wait(&g.ckpt_cond, &g.ckpt_mutex);
|
||||
g.nWait--;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&g.ckpt_mutex);
|
||||
}
|
||||
|
||||
if( rc==SQLITE_OK && p->bClientCkptRequired ){
|
||||
rc = sqlite3_wal_checkpoint(p->db, "main");
|
||||
assert( rc==SQLITE_OK );
|
||||
p->bClientCkptRequired = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if( rc==SQLITE_OK ){
|
||||
int nMs = (int)(get_timer() - t0);
|
||||
send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
|
||||
if( p->nRepeat<=0 ){
|
||||
send_message(p, "### ok %d busy %d ms %d\n", j-nBusy, nBusy, nMs);
|
||||
}
|
||||
}
|
||||
clear_sql(p);
|
||||
|
||||
pthread_mutex_lock(&g.ckpt_mutex);
|
||||
g.nRun--;
|
||||
pthread_mutex_unlock(&g.ckpt_mutex);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
|
||||
int n;
|
||||
int rc = 0;
|
||||
const char *z = &zCmd[1];
|
||||
const char *zArg;
|
||||
int nArg;
|
||||
|
||||
assert( zCmd[0]=='.' );
|
||||
for(n=0; n<(nCmd-1); n++){
|
||||
if( is_whitespace(z[n]) ) break;
|
||||
}
|
||||
|
||||
zArg = &z[n];
|
||||
nArg = nCmd-n;
|
||||
trim_string(&zArg, &nArg);
|
||||
|
||||
if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){
|
||||
int i;
|
||||
for(i=0; rc==0 && i<p->nPrepare; i++){
|
||||
const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt);
|
||||
int nSql = strlen(zSql);
|
||||
trim_string(&zSql, &nSql);
|
||||
rc = send_message(p, "%d: %.*s\n", i, nSql, zSql);
|
||||
}
|
||||
}
|
||||
|
||||
else if( n>=1 && n<=4 && 0==strncmp(z, "quit", n) ){
|
||||
rc = 1;
|
||||
}
|
||||
|
||||
else if( n>=2 && n<=7 && 0==strncmp(z, "repeats", n) ){
|
||||
if( nArg ){
|
||||
p->nRepeat = strtol(zArg, 0, 0);
|
||||
if( p->nRepeat>0 ) p->nSecond = 0;
|
||||
}
|
||||
rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat);
|
||||
}
|
||||
|
||||
else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){
|
||||
rc = handle_run_command(p);
|
||||
}
|
||||
|
||||
else if( n>=2 && n<=7 && 0==strncmp(z, "seconds", n) ){
|
||||
if( nArg ){
|
||||
p->nSecond = strtol(zArg, 0, 0);
|
||||
if( p->nSecond>0 ) p->nRepeat = 0;
|
||||
}
|
||||
rc = send_message(p, "ok (seconds=%d)\n", p->nSecond);
|
||||
}
|
||||
|
||||
else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){
|
||||
rc = handle_some_sql(p, "COMMIT;", 7);
|
||||
if( rc==SQLITE_OK ){
|
||||
p->aPrepare[p->nPrepare-1].bMutex = 1;
|
||||
}
|
||||
}
|
||||
|
||||
else if( n>=1 && n<=10 && 0==strncmp(z, "checkpoint", n) ){
|
||||
if( nArg ){
|
||||
p->nClientThreshold = strtol(zArg, 0, 0);
|
||||
}
|
||||
rc = send_message(p, "ok (checkpoint=%d)\n", p->nClientThreshold);
|
||||
}
|
||||
|
||||
else if( n>=2 && n<=4 && 0==strncmp(z, "stop", n) ){
|
||||
sqlite3_close(g.db);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
else{
|
||||
send_message(p,
|
||||
"unrecognized dot command: %.*s\n"
|
||||
"should be \"list\", \"run\", \"repeats\", \"mutex_commit\", "
|
||||
"\"checkpoint\" or \"seconds\"\n", n, z
|
||||
);
|
||||
rc = 1;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void *handle_client(void *pArg){
|
||||
char zCmd[32*1024]; /* Read buffer */
|
||||
int nCmd = 0; /* Valid bytes in zCmd[] */
|
||||
int res; /* Result of read() call */
|
||||
int rc = SQLITE_OK;
|
||||
|
||||
ClientCtx ctx;
|
||||
memset(&ctx, 0, sizeof(ClientCtx));
|
||||
|
||||
ctx.fd = (int)(intptr_t)pArg;
|
||||
ctx.nRepeat = 1;
|
||||
rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db,
|
||||
SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
|
||||
);
|
||||
if( rc!=SQLITE_OK ){
|
||||
fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db));
|
||||
return 0;
|
||||
}
|
||||
sqlite3_create_function(
|
||||
ctx.db, "usleep", 1, SQLITE_UTF8, (void*)sqlite3_vfs_find(0),
|
||||
usleepFunc, 0, 0
|
||||
);
|
||||
|
||||
/* Register the wal-hook with the new client connection */
|
||||
sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx);
|
||||
|
||||
while( rc==SQLITE_OK ){
|
||||
int i;
|
||||
int iStart;
|
||||
int nConsume;
|
||||
res = read(ctx.fd, &zCmd[nCmd], sizeof(zCmd)-nCmd-1);
|
||||
if( res<=0 ) break;
|
||||
nCmd += res;
|
||||
if( nCmd>=sizeof(zCmd)-1 ){
|
||||
fprintf(stderr, "oversized (>32KiB) message\n");
|
||||
res = 0;
|
||||
break;
|
||||
}
|
||||
zCmd[nCmd] = '\0';
|
||||
|
||||
do {
|
||||
nConsume = 0;
|
||||
|
||||
/* Gobble up any whitespace */
|
||||
iStart = 0;
|
||||
while( is_whitespace(zCmd[iStart]) ) iStart++;
|
||||
|
||||
if( zCmd[iStart]=='.' ){
|
||||
/* This is a dot-command. Search for end-of-line. */
|
||||
for(i=iStart; i<nCmd; i++){
|
||||
if( is_eol(zCmd[i]) ){
|
||||
rc = handle_dot_command(&ctx, &zCmd[iStart], i-iStart);
|
||||
nConsume = i+1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
|
||||
int iSemi;
|
||||
char c = 0;
|
||||
for(iSemi=iStart; iSemi<nCmd; iSemi++){
|
||||
if( zCmd[iSemi]==';' ){
|
||||
c = zCmd[iSemi+1];
|
||||
zCmd[iSemi+1] = '\0';
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if( iSemi<nCmd ){
|
||||
if( sqlite3_complete(zCmd) ){
|
||||
rc = handle_some_sql(&ctx, zCmd, iSemi+1);
|
||||
nConsume = iSemi+1;
|
||||
}
|
||||
|
||||
if( c ){
|
||||
zCmd[iSemi+1] = c;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( nConsume>0 ){
|
||||
nCmd = nCmd-nConsume;
|
||||
if( nCmd>0 ){
|
||||
memmove(zCmd, &zCmd[nConsume], nCmd);
|
||||
}
|
||||
}
|
||||
}while( rc==SQLITE_OK && nConsume>0 );
|
||||
}
|
||||
|
||||
fprintf(stdout, "Client %d disconnects\n", ctx.fd);
|
||||
fflush(stdout);
|
||||
close(ctx.fd);
|
||||
clear_sql(&ctx);
|
||||
sqlite3_free(ctx.aPrepare);
|
||||
sqlite3_close(ctx.db);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void usage(const char *zExec){
|
||||
fprintf(stderr, "Usage: %s ?-vfs VFS? DATABASE\n", zExec);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
int sfd;
|
||||
int rc;
|
||||
int yes = 1;
|
||||
struct sockaddr_in server;
|
||||
int i;
|
||||
|
||||
/* Ignore SIGPIPE. Otherwise the server exits if a client disconnects
|
||||
** abruptly. */
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD;
|
||||
if( (argc%2) ) usage(argv[0]);
|
||||
for(i=1; i<(argc-1); i+=2){
|
||||
int n = strlen(argv[i]);
|
||||
if( n>=2 && 0==sqlite3_strnicmp("-walautocheckpoint", argv[i], n) ){
|
||||
g.nThreshold = strtol(argv[i+1], 0, 0);
|
||||
}else
|
||||
if( n>=2 && 0==sqlite3_strnicmp("-vfs", argv[i], n) ){
|
||||
g.zVfs = argv[i+1];
|
||||
}
|
||||
}
|
||||
g.zDatabaseName = argv[argc-1];
|
||||
|
||||
g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
|
||||
pthread_mutex_init(&g.ckpt_mutex, 0);
|
||||
pthread_cond_init(&g.ckpt_cond, 0);
|
||||
|
||||
rc = sqlite3_open_v2(g.zDatabaseName, &g.db,
|
||||
SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
|
||||
);
|
||||
if( rc!=SQLITE_OK ){
|
||||
fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(g.db));
|
||||
return 1;
|
||||
}
|
||||
|
||||
rc = sqlite3_exec(g.db, "SELECT * FROM sqlite_master", 0, 0, 0);
|
||||
if( rc!=SQLITE_OK ){
|
||||
fprintf(stderr, "sqlite3_exec(): %s\n", sqlite3_errmsg(g.db));
|
||||
return 1;
|
||||
}
|
||||
|
||||
sfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if( sfd<0 ){
|
||||
fprintf(stderr, "socket() failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
rc = setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
|
||||
if( rc<0 ){
|
||||
perror("setsockopt");
|
||||
return 1;
|
||||
}
|
||||
|
||||
memset(&server, 0, sizeof(server));
|
||||
server.sin_family = AF_INET;
|
||||
server.sin_addr.s_addr = inet_addr("127.0.0.1");
|
||||
server.sin_port = htons(TSERVER_PORTNUMBER);
|
||||
|
||||
rc = bind(sfd, (struct sockaddr *)&server, sizeof(struct sockaddr));
|
||||
if( rc<0 ){
|
||||
fprintf(stderr, "bind() failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
rc = listen(sfd, 8);
|
||||
if( rc<0 ){
|
||||
fprintf(stderr, "listen() failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
while( 1 ){
|
||||
pthread_t tid;
|
||||
int cfd = accept(sfd, NULL, NULL);
|
||||
if( cfd<0 ){
|
||||
perror("accept()");
|
||||
return 1;
|
||||
}
|
||||
|
||||
fprintf(stdout, "Client %d connects\n", cfd);
|
||||
fflush(stdout);
|
||||
rc = pthread_create(&tid, NULL, handle_client, (void*)(intptr_t)cfd);
|
||||
if( rc!=0 ){
|
||||
perror("pthread_create()");
|
||||
return 1;
|
||||
}
|
||||
|
||||
pthread_detach(tid);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
283
tool/tserver_test.tcl
Normal file
283
tool/tserver_test.tcl
Normal file
@@ -0,0 +1,283 @@
|
||||
#!/usr/bin/tclsh
|
||||
#
|
||||
# This script is used to run the performance test cases described in
|
||||
# README-server-edition.html.
|
||||
#
|
||||
|
||||
|
||||
package require sqlite3
|
||||
|
||||
# Default values for command line switches:
|
||||
set O(-database) ""
|
||||
set O(-rows) [expr 5000000]
|
||||
set O(-mode) wal2
|
||||
set O(-tserver) "./tserver"
|
||||
set O(-seconds) 20
|
||||
set O(-writers) 1
|
||||
set O(-readers) 0
|
||||
set O(-verbose) 0
|
||||
|
||||
|
||||
proc error_out {err} {
|
||||
puts stderr $err
|
||||
exit -1
|
||||
}
|
||||
|
||||
proc usage {} {
|
||||
puts stderr "Usage: $::argv0 ?OPTIONS?"
|
||||
puts stderr ""
|
||||
puts stderr "Where OPTIONS are:"
|
||||
puts stderr " -database <database file> (default: test.db)"
|
||||
puts stderr " -mode <mode> (default: wal2)"
|
||||
puts stderr " -rows <number of rows> (default: 5000000)"
|
||||
puts stderr " -tserver <path to tserver executable> (default: ./tserver)"
|
||||
puts stderr " -seconds <time to run for in seconds> (default: 20)"
|
||||
puts stderr " -writers <number of writer clients> (default: 1)"
|
||||
puts stderr " -readers <number of reader clients> (default: 0)"
|
||||
puts stderr " -verbose 0|1 (default: 0)"
|
||||
exit -1
|
||||
}
|
||||
|
||||
for {set i 0} {$i < [llength $argv]} {incr i} {
|
||||
set opt ""
|
||||
set arg [lindex $argv $i]
|
||||
set n [expr [string length $arg]-1]
|
||||
foreach k [array names ::O] {
|
||||
if {[string range $k 0 $n]==$arg} {
|
||||
if {$opt==""} {
|
||||
set opt $k
|
||||
} else {
|
||||
error_out "ambiguous option: $arg ($k or $opt)"
|
||||
}
|
||||
}
|
||||
}
|
||||
if {$opt==""} { usage }
|
||||
if {$i==[llength $argv]-1} {
|
||||
error_out "option requires an argument: $opt"
|
||||
}
|
||||
incr i
|
||||
set val [lindex $argv $i]
|
||||
switch -- $opt {
|
||||
-mode {
|
||||
if {$val != "wal" && $val != "wal2"} {
|
||||
set xyz "\"wal\" or \"wal2\""
|
||||
error_out "Found \"$val\" - expected $xyz"
|
||||
}
|
||||
}
|
||||
}
|
||||
set O($opt) [lindex $argv $i]
|
||||
}
|
||||
if {$O(-database)==""} {
|
||||
set O(-database) "test.db"
|
||||
}
|
||||
|
||||
set O(-rows) [expr $O(-rows)]
|
||||
|
||||
#--------------------------------------------------------------------------
|
||||
# Create and populate the required test database, if it is not already
|
||||
# present in the file-system.
|
||||
#
|
||||
proc create_test_database {} {
|
||||
global O
|
||||
|
||||
if {[file exists $O(-database)]} {
|
||||
sqlite3 db $O(-database)
|
||||
|
||||
# Check the schema looks Ok.
|
||||
set s [db one {
|
||||
SELECT group_concat(name||pk, '.') FROM pragma_table_info('t1');
|
||||
}]
|
||||
if {$s != "a1.b0.c0.d0"} {
|
||||
error_out "Database $O(-database) exists but is not usable (schema)"
|
||||
}
|
||||
|
||||
# Check that the row count matches.
|
||||
set n [db one { SELECT count(*) FROM t1 }]
|
||||
if {$n != $O(-rows)} {
|
||||
error_out "Database $O(-database) exists but is not usable (row-count)"
|
||||
}
|
||||
db close
|
||||
} else {
|
||||
catch { file delete -force $O(-database)-journal }
|
||||
catch { file delete -force $O(-database)-wal }
|
||||
|
||||
if {$O(-verbose)} {
|
||||
puts "Building database $O(-database)..."
|
||||
}
|
||||
|
||||
sqlite3 db $O(-database)
|
||||
db eval {
|
||||
CREATE TABLE t1(
|
||||
a INTEGER PRIMARY KEY,
|
||||
b BLOB(16),
|
||||
c BLOB(16),
|
||||
d BLOB(400)
|
||||
);
|
||||
CREATE INDEX i1 ON t1(b);
|
||||
CREATE INDEX i2 ON t1(c);
|
||||
|
||||
WITH s(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM s WHERE i<$O(-rows))
|
||||
INSERT INTO t1
|
||||
SELECT i-1, randomblob(16), randomblob(16), randomblob(400) FROM s;
|
||||
}
|
||||
db close
|
||||
}
|
||||
|
||||
switch -- $O(-mode) {
|
||||
wal {
|
||||
sqlite3 db $O(-database)
|
||||
db eval {PRAGMA journal_mode = delete}
|
||||
db eval {PRAGMA journal_mode = wal}
|
||||
db close
|
||||
}
|
||||
|
||||
wal2 {
|
||||
sqlite3 db $O(-database)
|
||||
db eval {PRAGMA journal_mode = delete}
|
||||
db eval {PRAGMA journal_mode = wal2}
|
||||
db close
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#-------------------------------------------------------------------------
|
||||
# Functions to start and stop the tserver process:
|
||||
#
|
||||
# tserver_start
|
||||
# tserver_stop
|
||||
#
|
||||
set ::tserver {}
|
||||
proc tserver_start {} {
|
||||
global O
|
||||
set cmd "|$O(-tserver) -vfs unix "
|
||||
if {$O(-mode)=="wal2"} {
|
||||
append cmd " -walautocheckpoint 0 "
|
||||
}
|
||||
append cmd "$O(-database)"
|
||||
set ::tserver [open $cmd]
|
||||
fconfigure $::tserver -blocking 0
|
||||
fileevent $::tserver readable tserver_data
|
||||
}
|
||||
|
||||
proc tserver_data {} {
|
||||
global O
|
||||
if {[eof $::tserver]} {
|
||||
error_out "tserver has exited"
|
||||
}
|
||||
set line [gets $::tserver]
|
||||
if {$line != "" && $O(-verbose)} {
|
||||
puts "tserver: $line"
|
||||
}
|
||||
}
|
||||
|
||||
proc tserver_stop {} {
|
||||
close $::tserver
|
||||
set fd [socket localhost 9999]
|
||||
puts $fd ".stop"
|
||||
close $fd
|
||||
}
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
set ::nClient 0
|
||||
set ::client_output [list]
|
||||
|
||||
proc client_data {name fd} {
|
||||
global O
|
||||
if {[eof $fd]} {
|
||||
incr ::nClient -1
|
||||
close $fd
|
||||
return
|
||||
}
|
||||
set str [gets $fd]
|
||||
if {[string trim $str]!=""} {
|
||||
if {[string range $str 0 3]=="### "} {
|
||||
lappend ::client_output [concat [list name $name] [lrange $str 1 end]]
|
||||
}
|
||||
if {$O(-verbose)} {
|
||||
puts "$name: $str"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proc client_launch {name script} {
|
||||
global O
|
||||
set fd [socket localhost 9999]
|
||||
fconfigure $fd -blocking 0
|
||||
puts $fd "PRAGMA synchronous = OFF;"
|
||||
puts $fd ".repeat 1"
|
||||
puts $fd ".run"
|
||||
puts $fd $script
|
||||
puts $fd ".seconds $O(-seconds)"
|
||||
puts $fd ".run"
|
||||
puts $fd ".quit"
|
||||
flush $fd
|
||||
incr ::nClient
|
||||
fileevent $fd readable [list client_data $name $fd]
|
||||
}
|
||||
|
||||
proc client_wait {} {
|
||||
while {$::nClient>0} {vwait ::nClient}
|
||||
}
|
||||
|
||||
proc script_writer {bCkpt} {
|
||||
global O
|
||||
|
||||
set commit ".mutex_commit"
|
||||
set begin "BEGIN CONCURRENT;"
|
||||
set ckpt ""
|
||||
if {$bCkpt} {
|
||||
set ckpt ".checkpoint 2000"
|
||||
}
|
||||
|
||||
set tail "randomblob(16), randomblob(16), randomblob(400));"
|
||||
return [subst -nocommands {
|
||||
$ckpt
|
||||
$begin
|
||||
REPLACE INTO t1 VALUES(abs(random() % $O(-rows)), $tail
|
||||
$commit
|
||||
}]
|
||||
}
|
||||
|
||||
proc script_reader {} {
|
||||
global O
|
||||
|
||||
return [subst -nocommands {
|
||||
BEGIN;
|
||||
SELECT * FROM t1 WHERE a>abs((random()%$O(-rows))) LIMIT 10;
|
||||
END;
|
||||
}]
|
||||
}
|
||||
|
||||
|
||||
create_test_database
|
||||
tserver_start
|
||||
|
||||
for {set i 0} {$i < $O(-writers)} {incr i} {
|
||||
client_launch w.$i [script_writer [expr {$i==0 && $O(-mode)=="wal2"}]]
|
||||
}
|
||||
for {set i 0} {$i < $O(-readers)} {incr i} {
|
||||
client_launch r.$i [script_reader]
|
||||
}
|
||||
client_wait
|
||||
|
||||
set name(w) "Writers"
|
||||
set name(r) "Readers"
|
||||
foreach r $::client_output {
|
||||
array set a $r
|
||||
set type [string range $a(name) 0 0]
|
||||
incr x($type.ok) $a(ok);
|
||||
incr x($type.busy) $a(busy);
|
||||
incr x($type.n) 1
|
||||
set t($type) 1
|
||||
}
|
||||
|
||||
foreach type [array names t] {
|
||||
set nTPS [expr $x($type.ok) / $O(-seconds)]
|
||||
set nC [expr $nTPS / $x($type.n)]
|
||||
set nTotal [expr $x($type.ok) + $x($type.busy)]
|
||||
set bp [format %.2f [expr $x($type.busy) * 100.0 / $nTotal]]
|
||||
puts "$name($type): $nTPS transactions/second ($nC per client) ($bp% busy)"
|
||||
}
|
||||
|
||||
tserver_stop
|
||||
|
Reference in New Issue
Block a user