mirror of
https://github.com/sqlite/sqlite.git
synced 2025-07-27 20:41:58 +03:00
Add test app for experimenting with multi-worker OPFS concurrency. Tweak OPFS VFS to significantly improve the otherwise "unfortunate" concurrency situation.
FossilOrigin-Name: 96f76e7616f8157a342b9e1c42f7b1feab200d182268871a2b25f67d4ee2564c
This commit is contained in:
@ -7,6 +7,7 @@ _sqlite3_bind_null
|
||||
_sqlite3_bind_parameter_count
|
||||
_sqlite3_bind_parameter_index
|
||||
_sqlite3_bind_text
|
||||
_sqlite3_busy_timeout
|
||||
_sqlite3_changes
|
||||
_sqlite3_changes64
|
||||
_sqlite3_clear_bindings
|
||||
|
@ -59,6 +59,9 @@ const toExportForES6 =
|
||||
li.pop();
|
||||
initModuleState.sqlite3Dir = li.join('/') + '/';
|
||||
}
|
||||
if(initModuleState.sqlite3Dir){
|
||||
initModuleState.sqlite3Dir = initModuleState.sqlite3Dir.replace(/[/]{2,}/g,'/');
|
||||
}
|
||||
|
||||
self.sqlite3InitModule = (...args)=>{
|
||||
//console.warn("Using replaced sqlite3InitModule()",self.location);
|
||||
|
@ -92,7 +92,8 @@ const installOpfsVfs = function callee(options){
|
||||
}
|
||||
const urlParams = new URL(self.location.href).searchParams;
|
||||
if(undefined===options.verbose){
|
||||
options.verbose = urlParams.has('opfs-verbose') ? 3 : 2;
|
||||
options.verbose = urlParams.has('opfs-verbose')
|
||||
? (+urlParams.get('opfs-verbose') || 2) : 1;
|
||||
}
|
||||
if(undefined===options.sanityChecks){
|
||||
options.sanityChecks = urlParams.has('opfs-sanity-check');
|
||||
@ -101,6 +102,8 @@ const installOpfsVfs = function callee(options){
|
||||
options.proxyUri = callee.defaultProxyUri;
|
||||
}
|
||||
|
||||
//console.warn("OPFS options =",options,self.location);
|
||||
|
||||
if('function' === typeof options.proxyUri){
|
||||
options.proxyUri = options.proxyUri();
|
||||
}
|
||||
@ -1154,7 +1157,10 @@ const installOpfsVfs = function callee(options){
|
||||
[
|
||||
/* Truncate journal mode is faster than delete or wal for
|
||||
this vfs, per speedtest1. */
|
||||
"pragma journal_mode=truncate;"
|
||||
"pragma journal_mode=truncate;",
|
||||
/* Set a default busy-timeout handler to help OPFS dbs
|
||||
deal with multi-tab/multi-worker contention. */
|
||||
"pragma busy_timeout=2000;",
|
||||
/*
|
||||
This vfs benefits hugely from cache on moderate/large
|
||||
speedtest1 --size 50 and --size 100 workloads. We currently
|
||||
@ -1162,7 +1168,7 @@ const installOpfsVfs = function callee(options){
|
||||
sqlite3.wasm. If that policy changes, the cache can
|
||||
be set here.
|
||||
*/
|
||||
//"pragma cache_size=-8388608;"
|
||||
//"pragma cache_size=-16384;"
|
||||
].join("")
|
||||
);
|
||||
}
|
||||
|
@ -897,6 +897,7 @@ self.sqlite3ApiBootstrap = function sqlite3ApiBootstrap(
|
||||
the lines of sqlite3_prepare_v3(). The slightly problematic
|
||||
part is the final argument (text destructor). */
|
||||
],
|
||||
["sqlite3_busy_timeout","int", "sqlite3*", "int"],
|
||||
["sqlite3_close_v2", "int", "sqlite3*"],
|
||||
["sqlite3_changes", "int", "sqlite3*"],
|
||||
["sqlite3_clear_bindings","int", "sqlite3_stmt*"],
|
||||
|
@ -53,7 +53,7 @@ const state = Object.create(null);
|
||||
2 = warnings and errors
|
||||
3 = debug, warnings, and errors
|
||||
*/
|
||||
state.verbose = 2;
|
||||
state.verbose = 1;
|
||||
|
||||
const loggers = {
|
||||
0:console.error.bind(console),
|
||||
@ -150,70 +150,6 @@ const getDirForFilename = async function f(absFilename, createDirs = false){
|
||||
return [dh, filename];
|
||||
};
|
||||
|
||||
/**
|
||||
An error class specifically for use with getSyncHandle(), the goal
|
||||
of which is to eventually be able to distinguish unambiguously
|
||||
between locking-related failures and other types, noting that we
|
||||
cannot currently do so because createSyncAccessHandle() does not
|
||||
define its exceptions in the required level of detail.
|
||||
*/
|
||||
class GetSyncHandleError extends Error {
|
||||
constructor(errorObject, ...msg){
|
||||
super();
|
||||
this.error = errorObject;
|
||||
this.message = [
|
||||
...msg, ': Original exception ['+errorObject.name+']:',
|
||||
errorObject.message
|
||||
].join(' ');
|
||||
this.name = 'GetSyncHandleError';
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
Returns the sync access handle associated with the given file
|
||||
handle object (which must be a valid handle object, as created by
|
||||
xOpen()), lazily opening it if needed.
|
||||
|
||||
In order to help alleviate cross-tab contention for a dabase,
|
||||
if an exception is thrown while acquiring the handle, this routine
|
||||
will wait briefly and try again, up to 3 times. If acquisition
|
||||
still fails at that point it will give up and propagate the
|
||||
exception.
|
||||
*/
|
||||
const getSyncHandle = async (fh)=>{
|
||||
if(!fh.syncHandle){
|
||||
const t = performance.now();
|
||||
log("Acquiring sync handle for",fh.filenameAbs);
|
||||
const maxTries = 4, msBase = 300;
|
||||
let i = 1, ms = msBase;
|
||||
for(; true; ms = msBase * ++i){
|
||||
try {
|
||||
//if(i<3) toss("Just testing getSyncHandle() wait-and-retry.");
|
||||
//TODO? A config option which tells it to throw here
|
||||
//randomly every now and then, for testing purposes.
|
||||
fh.syncHandle = await fh.fileHandle.createSyncAccessHandle();
|
||||
break;
|
||||
}catch(e){
|
||||
if(i === maxTries){
|
||||
throw new GetSyncHandleError(
|
||||
e, "Error getting sync handle.",maxTries,
|
||||
"attempts failed.",fh.filenameAbs
|
||||
);
|
||||
}
|
||||
warn("Error getting sync handle. Waiting",ms,
|
||||
"ms and trying again.",fh.filenameAbs,e);
|
||||
Atomics.wait(state.sabOPView, state.opIds.retry, 0, ms);
|
||||
}
|
||||
}
|
||||
log("Got sync handle for",fh.filenameAbs,'in',performance.now() - t,'ms');
|
||||
if(!fh.xLock){
|
||||
__autoLocks.add(fh.fid);
|
||||
log("Auto-locked",fh.fid,fh.filenameAbs);
|
||||
}
|
||||
}
|
||||
return fh.syncHandle;
|
||||
};
|
||||
|
||||
/**
|
||||
If the given file-holding object has a sync handle attached to it,
|
||||
that handle is remove and asynchronously closed. Though it may
|
||||
@ -253,6 +189,101 @@ const closeSyncHandleNoThrow = async (fh)=>{
|
||||
}
|
||||
};
|
||||
|
||||
/* Release all auto-locks. */
|
||||
const closeAutoLocks = async ()=>{
|
||||
if(__autoLocks.size){
|
||||
/* Release all auto-locks. */
|
||||
for(const fid of __autoLocks){
|
||||
const fh = __openFiles[fid];
|
||||
await closeSyncHandleNoThrow(fh);
|
||||
log("Auto-unlocked",fid,fh.filenameAbs);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
An error class specifically for use with getSyncHandle(), the goal
|
||||
of which is to eventually be able to distinguish unambiguously
|
||||
between locking-related failures and other types, noting that we
|
||||
cannot currently do so because createSyncAccessHandle() does not
|
||||
define its exceptions in the required level of detail.
|
||||
*/
|
||||
class GetSyncHandleError extends Error {
|
||||
constructor(errorObject, ...msg){
|
||||
super();
|
||||
this.error = errorObject;
|
||||
this.message = [
|
||||
...msg, ': Original exception ['+errorObject.name+']:',
|
||||
errorObject.message
|
||||
].join(' ');
|
||||
this.name = 'GetSyncHandleError';
|
||||
}
|
||||
};
|
||||
GetSyncHandleError.convertRc = (e,rc)=>{
|
||||
if(1){
|
||||
/* This approach returns SQLITE_LOCKED to the C API
|
||||
when getSyncHandle() fails but makes the very
|
||||
wild assumption that such a failure _is_ a locking
|
||||
error. In practice that appears to be the most
|
||||
common error, by far, but we cannot unambiguously
|
||||
distinguish that from other errors.
|
||||
|
||||
This approach demonstrably reduces concurrency-related
|
||||
errors but is highly questionable.
|
||||
*/
|
||||
return (e instanceof GetSyncHandleError)
|
||||
? state.sq3Codes.SQLITE_LOCKED
|
||||
: rc;
|
||||
}else{
|
||||
return ec;
|
||||
}
|
||||
}
|
||||
/**
|
||||
Returns the sync access handle associated with the given file
|
||||
handle object (which must be a valid handle object, as created by
|
||||
xOpen()), lazily opening it if needed.
|
||||
|
||||
In order to help alleviate cross-tab contention for a dabase,
|
||||
if an exception is thrown while acquiring the handle, this routine
|
||||
will wait briefly and try again, up to 3 times. If acquisition
|
||||
still fails at that point it will give up and propagate the
|
||||
exception.
|
||||
*/
|
||||
const getSyncHandle = async (fh)=>{
|
||||
if(!fh.syncHandle){
|
||||
const t = performance.now();
|
||||
log("Acquiring sync handle for",fh.filenameAbs);
|
||||
const maxTries = 4, msBase = 300;
|
||||
let i = 1, ms = msBase;
|
||||
for(; true; ms = msBase * ++i){
|
||||
try {
|
||||
//if(i<3) toss("Just testing getSyncHandle() wait-and-retry.");
|
||||
//TODO? A config option which tells it to throw here
|
||||
//randomly every now and then, for testing purposes.
|
||||
fh.syncHandle = await fh.fileHandle.createSyncAccessHandle();
|
||||
break;
|
||||
}catch(e){
|
||||
if(i === maxTries){
|
||||
throw new GetSyncHandleError(
|
||||
e, "Error getting sync handle.",maxTries,
|
||||
"attempts failed.",fh.filenameAbs
|
||||
);
|
||||
}
|
||||
warn("Error getting sync handle. Waiting",ms,
|
||||
"ms and trying again.",fh.filenameAbs,e);
|
||||
//await closeAutoLocks();
|
||||
Atomics.wait(state.sabOPView, state.opIds.retry, 0, ms);
|
||||
}
|
||||
}
|
||||
log("Got sync handle for",fh.filenameAbs,'in',performance.now() - t,'ms');
|
||||
if(!fh.xLock){
|
||||
__autoLocks.add(fh.fid);
|
||||
log("Auto-locked",fh.fid,fh.filenameAbs);
|
||||
}
|
||||
}
|
||||
return fh.syncHandle;
|
||||
};
|
||||
|
||||
/**
|
||||
Stores the given value at state.sabOPView[state.opIds.rc] and then
|
||||
Atomics.notify()'s it.
|
||||
@ -451,7 +482,7 @@ const vfsAsyncImpls = {
|
||||
rc = 0;
|
||||
}catch(e){
|
||||
state.s11n.storeException(2,e);
|
||||
rc = state.sq3Codes.SQLITE_IOERR;
|
||||
rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR);
|
||||
}
|
||||
wTimeEnd();
|
||||
storeAndNotify('xFileSize', rc);
|
||||
@ -471,7 +502,7 @@ const vfsAsyncImpls = {
|
||||
__autoLocks.delete(fid);
|
||||
}catch(e){
|
||||
state.s11n.storeException(1,e);
|
||||
rc = state.sq3Codes.SQLITE_IOERR_LOCK;
|
||||
rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_LOCK);
|
||||
fh.xLock = oldLockType;
|
||||
}
|
||||
wTimeEnd();
|
||||
@ -545,7 +576,7 @@ const vfsAsyncImpls = {
|
||||
if(undefined===nRead) wTimeEnd();
|
||||
error("xRead() failed",e,fh);
|
||||
state.s11n.storeException(1,e);
|
||||
rc = state.sq3Codes.SQLITE_IOERR_READ;
|
||||
rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_READ);
|
||||
}
|
||||
storeAndNotify('xRead',rc);
|
||||
mTimeEnd();
|
||||
@ -579,7 +610,7 @@ const vfsAsyncImpls = {
|
||||
}catch(e){
|
||||
error("xTruncate():",e,fh);
|
||||
state.s11n.storeException(2,e);
|
||||
rc = state.sq3Codes.SQLITE_IOERR_TRUNCATE;
|
||||
rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_TRUNCATE);
|
||||
}
|
||||
wTimeEnd();
|
||||
storeAndNotify('xTruncate',rc);
|
||||
@ -619,7 +650,7 @@ const vfsAsyncImpls = {
|
||||
}catch(e){
|
||||
error("xWrite():",e,fh);
|
||||
state.s11n.storeException(1,e);
|
||||
rc = state.sq3Codes.SQLITE_IOERR_WRITE;
|
||||
rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_WRITE);
|
||||
}
|
||||
wTimeEnd();
|
||||
storeAndNotify('xWrite',rc);
|
||||
@ -746,22 +777,16 @@ const waitLoop = async function f(){
|
||||
/**
|
||||
waitTime is how long (ms) to wait for each Atomics.wait().
|
||||
We need to wake up periodically to give the thread a chance
|
||||
to do other things.
|
||||
to do other things. If this is too high (e.g. 500ms) then
|
||||
even two workers/tabs can easily run into locking errors.
|
||||
*/
|
||||
const waitTime = 500;
|
||||
const waitTime = 150;
|
||||
while(!flagAsyncShutdown){
|
||||
try {
|
||||
if('timed-out'===Atomics.wait(
|
||||
state.sabOPView, state.opIds.whichOp, 0, waitTime
|
||||
)){
|
||||
if(__autoLocks.size){
|
||||
/* Release all auto-locks. */
|
||||
for(const fid of __autoLocks){
|
||||
const fh = __openFiles[fid];
|
||||
await closeSyncHandleNoThrow(fh);
|
||||
log("Auto-unlocked",fid,fh.filenameAbs);
|
||||
}
|
||||
}
|
||||
await closeAutoLocks();
|
||||
continue;
|
||||
}
|
||||
const opId = Atomics.load(state.sabOPView, state.opIds.whichOp);
|
||||
@ -791,7 +816,7 @@ navigator.storage.getDirectory().then(function(d){
|
||||
const opt = data.args;
|
||||
state.littleEndian = opt.littleEndian;
|
||||
state.asyncS11nExceptions = opt.asyncS11nExceptions;
|
||||
state.verbose = opt.verbose ?? 2;
|
||||
state.verbose = opt.verbose ?? 1;
|
||||
state.fileBufferSize = opt.fileBufferSize;
|
||||
state.sabS11nOffset = opt.sabS11nOffset;
|
||||
state.sabS11nSize = opt.sabS11nSize;
|
||||
|
@ -104,6 +104,9 @@
|
||||
synchronous sqlite3_vfs interface and the async OPFS
|
||||
impl.
|
||||
</li>
|
||||
<li><a href='tests/opfs/concurrency/index.html'>OPFS concurrency</a>
|
||||
tests using multiple workers.
|
||||
</li>
|
||||
</ul>
|
||||
</li>
|
||||
<!--li><a href='x.html'></a></li-->
|
||||
|
34
ext/wasm/tests/opfs/concurrency/index.html
Normal file
34
ext/wasm/tests/opfs/concurrency/index.html
Normal file
@ -0,0 +1,34 @@
|
||||
<!doctype html>
|
||||
<html lang="en-us">
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
|
||||
<link rel="shortcut icon" href="data:image/x-icon;," type="image/x-icon">
|
||||
<link rel="stylesheet" href="../../../common/testing.css"/>
|
||||
<title>sqlite3 OPFS Worker concurrency tester</title>
|
||||
<style>
|
||||
body { display: revert; }
|
||||
body > * {}
|
||||
#test-output {
|
||||
font-family: monospace;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1></h1>
|
||||
<p>
|
||||
OPFS concurrency tester using multiple independent Workers.
|
||||
This app is incomplete.
|
||||
</p>
|
||||
<div class='input-wrapper'>
|
||||
<input type='checkbox' id='cb-log-reverse'>
|
||||
<label for='cb-log-reverse'>Reverse log order?</label>
|
||||
</div>
|
||||
<div id='test-output'></div>
|
||||
<script>(function(){
|
||||
document.querySelector('h1').innerHTML =
|
||||
document.querySelector('title').innerHTML;
|
||||
})();</script>
|
||||
<script src="test.js?sqlite3.dir=../../../jswasm"></script>
|
||||
</body>
|
||||
</html>
|
97
ext/wasm/tests/opfs/concurrency/test.js
Normal file
97
ext/wasm/tests/opfs/concurrency/test.js
Normal file
@ -0,0 +1,97 @@
|
||||
(async function(self){
|
||||
|
||||
const logClass = (function(){
|
||||
const mapToString = (v)=>{
|
||||
switch(typeof v){
|
||||
case 'number': case 'string': case 'boolean':
|
||||
case 'undefined': case 'bigint':
|
||||
return ''+v;
|
||||
default: break;
|
||||
}
|
||||
if(null===v) return 'null';
|
||||
if(v instanceof Error){
|
||||
v = {
|
||||
message: v.message,
|
||||
stack: v.stack,
|
||||
errorClass: v.name
|
||||
};
|
||||
}
|
||||
return JSON.stringify(v,undefined,2);
|
||||
};
|
||||
const normalizeArgs = (args)=>args.map(mapToString);
|
||||
const logTarget = document.querySelector('#test-output');
|
||||
const logClass = function(cssClass,...args){
|
||||
const ln = document.createElement('div');
|
||||
if(cssClass){
|
||||
for(const c of (Array.isArray(cssClass) ? cssClass : [cssClass])){
|
||||
ln.classList.add(c);
|
||||
}
|
||||
}
|
||||
ln.append(document.createTextNode(normalizeArgs(args).join(' ')));
|
||||
logTarget.append(ln);
|
||||
};
|
||||
const cbReverse = document.querySelector('#cb-log-reverse');
|
||||
const cbReverseKey = 'tester1:cb-log-reverse';
|
||||
const cbReverseIt = ()=>{
|
||||
logTarget.classList[cbReverse.checked ? 'add' : 'remove']('reverse');
|
||||
localStorage.setItem(cbReverseKey, cbReverse.checked ? 1 : 0);
|
||||
};
|
||||
cbReverse.addEventListener('change', cbReverseIt, true);
|
||||
if(localStorage.getItem(cbReverseKey)){
|
||||
cbReverse.checked = !!(+localStorage.getItem(cbReverseKey));
|
||||
}
|
||||
cbReverseIt();
|
||||
return logClass;
|
||||
})();
|
||||
const stdout = (...args)=>logClass('',...args);
|
||||
const stderr = (...args)=>logClass('error',...args);
|
||||
|
||||
const wait = async (ms)=>{
|
||||
return new Promise((resolve)=>setTimeout(resolve,ms));
|
||||
};
|
||||
|
||||
const urlArgsJs = new URL(document.currentScript.src).searchParams;
|
||||
const urlArgsHtml = new URL(self.location.href).searchParams;
|
||||
const options = Object.create(null);
|
||||
options.sqlite3Dir = urlArgsJs.get('sqlite3.dir');
|
||||
options.workerCount = (
|
||||
urlArgsHtml.has('workers') ? +urlArgsHtml.get('workers') : 3
|
||||
) || 3;
|
||||
const workers = [];
|
||||
workers.post = (type,...args)=>{
|
||||
for(const w of workers) w.postMessage({type, payload:args});
|
||||
};
|
||||
workers.loadedCount = 0;
|
||||
workers.onmessage = function(msg){
|
||||
msg = msg.data;
|
||||
const wName = msg.worker;
|
||||
const prefix = 'Worker ['+wName+']:';
|
||||
switch(msg.type){
|
||||
case 'stdout': stdout(prefix,...msg.payload); break;
|
||||
case 'stderr': stderr(prefix,...msg.payload); break;
|
||||
case 'error': stderr(prefix,"ERROR:",...msg.payload); break;
|
||||
case 'loaded':
|
||||
stdout(prefix,"loaded");
|
||||
if(++workers.loadedCount === workers.length){
|
||||
stdout("All workers loaded. Telling them to run...");
|
||||
workers.post('run');
|
||||
}
|
||||
break;
|
||||
default: logClass('error',"Unhandled message type:",msg); break;
|
||||
}
|
||||
};
|
||||
|
||||
stdout("Launching",options.workerCount,"workers...");
|
||||
workers.uri = (
|
||||
'worker.js?'
|
||||
+ 'sqlite3.dir='+options.sqlite3Dir
|
||||
+ '&opfs-verbose=2'
|
||||
);
|
||||
for(let i = 0; i < options.workerCount; ++i){
|
||||
stdout("Launching worker...");
|
||||
workers.push(new Worker(workers.uri+(i ? '' : '&unlink-db')));
|
||||
}
|
||||
// Have to delay onmessage assignment until after the loop
|
||||
// to avoid that early workers get an undue head start.
|
||||
workers.forEach((w)=>w.onmessage = workers.onmessage);
|
||||
})(self);
|
95
ext/wasm/tests/opfs/concurrency/worker.js
Normal file
95
ext/wasm/tests/opfs/concurrency/worker.js
Normal file
@ -0,0 +1,95 @@
|
||||
importScripts(
|
||||
(new URL(self.location.href).searchParams).get('sqlite3.dir') + '/sqlite3.js'
|
||||
);
|
||||
self.sqlite3InitModule().then(async function(sqlite3){
|
||||
const wName = Math.round(Math.random()*10000);
|
||||
const wPost = (type,...payload)=>{
|
||||
postMessage({type, worker: wName, payload});
|
||||
};
|
||||
const stdout = (...args)=>wPost('stdout',...args);
|
||||
const stderr = (...args)=>wPost('stderr',...args);
|
||||
const postErr = (...args)=>wPost('error',...args);
|
||||
if(!sqlite3.opfs){
|
||||
stderr("OPFS support not detected. Aborting.");
|
||||
return;
|
||||
}
|
||||
|
||||
const wait = async (ms)=>{
|
||||
return new Promise((resolve)=>setTimeout(resolve,ms));
|
||||
};
|
||||
|
||||
const dbName = 'concurrency-tester.db';
|
||||
if((new URL(self.location.href).searchParams).has('unlink-db')){
|
||||
await sqlite3.opfs.unlink(dbName);
|
||||
stdout("Unlinked",dbName);
|
||||
}
|
||||
wPost('loaded');
|
||||
|
||||
const run = async function(){
|
||||
const db = new sqlite3.opfs.OpfsDb(dbName);
|
||||
//sqlite3.capi.sqlite3_busy_timeout(db.pointer, 2000);
|
||||
db.transaction((db)=>{
|
||||
db.exec([
|
||||
"create table if not exists t1(w TEXT UNIQUE ON CONFLICT REPLACE,v);",
|
||||
"create table if not exists t2(w TEXT UNIQUE ON CONFLICT REPLACE,v);"
|
||||
]);
|
||||
});
|
||||
|
||||
const maxIterations = 10;
|
||||
const interval = Object.assign(Object.create(null),{
|
||||
delay: 300,
|
||||
handle: undefined,
|
||||
count: 0
|
||||
});
|
||||
stdout("Starting interval-based db updates with delay of",interval.delay,"ms.");
|
||||
const doWork = async ()=>{
|
||||
const tm = new Date().getTime();
|
||||
++interval.count;
|
||||
const prefix = "v(#"+interval.count+")";
|
||||
stdout("Setting",prefix,"=",tm);
|
||||
try{
|
||||
db.exec({
|
||||
sql:"INSERT OR REPLACE INTO t1(w,v) VALUES(?,?)",
|
||||
bind: [wName, new Date().getTime()]
|
||||
});
|
||||
//stdout("Set",prefix);
|
||||
}catch(e){
|
||||
interval.error = e;
|
||||
}
|
||||
};
|
||||
const finish = ()=>{
|
||||
if(interval.error) stderr("Ending work due to error:",e.message);
|
||||
else stdout("Ending work after",interval.count,"interval(s)");
|
||||
db.close();
|
||||
};
|
||||
if(1){/*use setInterval()*/
|
||||
interval.handle = setInterval(async ()=>{
|
||||
await doWork();
|
||||
if(interval.error || maxIterations === interval.count){
|
||||
clearInterval(interval.handle);
|
||||
finish();
|
||||
}
|
||||
}, interval.delay);
|
||||
}else{
|
||||
/*This approach provides no concurrency whatsoever: each worker
|
||||
is run to completion before any others can work.*/
|
||||
let i;
|
||||
for(i = 0; i < maxIterations; ++i){
|
||||
await doWork();
|
||||
if(interval.error) break;
|
||||
await wait(interval.ms);
|
||||
}
|
||||
finish();
|
||||
}
|
||||
}/*run()*/;
|
||||
|
||||
self.onmessage = function({data}){
|
||||
switch(data.type){
|
||||
case 'run': run().catch((e)=>postErr(e.message));
|
||||
break;
|
||||
default:
|
||||
stderr("Unhandled message type '"+data.type+"'.");
|
||||
break;
|
||||
}
|
||||
};
|
||||
});
|
Reference in New Issue
Block a user