1
0
mirror of https://github.com/sqlite/sqlite.git synced 2025-06-12 04:41:58 +03:00

An alternative messaging strategy for the OPFS VFS proxy which uses only SharedArrayBuffer and Atomics, instead of worker messages, for communication (only the initial one-time handshake during initialization uses worker messages). It runs speedtest1 approx. 15-20% faster but still 20-ish% slower than WASMFS.

FossilOrigin-Name: a83ee3082d89439ea3ad5737e63e25bebb0f91895aca006ce5fecf5b93a2651a
This commit is contained in:
stephan
2022-09-20 08:27:57 +00:00
parent 138647a552
commit 5e8bb0aa13
6 changed files with 200 additions and 145 deletions

View File

@ -63,7 +63,7 @@ const error = (...args)=>logImpl(0, ...args);
const metrics = Object.create(null);
metrics.reset = ()=>{
let k;
const r = (m)=>(m.count = m.time = 0);
const r = (m)=>(m.count = m.time = m.wait = 0);
for(k in state.opIds){
r(metrics[k] = Object.create(null));
}
@ -74,11 +74,15 @@ metrics.dump = ()=>{
const m = metrics[k];
n += m.count;
t += m.time;
w += m.wait;
m.avgTime = (m.count && m.time) ? (m.time / m.count) : 0;
}
console.log(self.location.href,
"metrics for",self.location.href,":",metrics,
"\nTotal of",n,"op(s) for",t,"ms");
"metrics for",self.location.href,":\n",
JSON.stringify(metrics,0,2)
/*dev console can't expand this object!*/,
"\nTotal of",n,"op(s) for",t,"ms",
"approx",w,"ms spent waiting on OPFS APIs.");
};
warn("This file is very much experimental and under construction.",
@ -130,9 +134,9 @@ const getDirForPath = async function f(absFilename, createDirs = false){
and then Atomics.notify()'s it.
*/
const storeAndNotify = (opName, value)=>{
log(opName+"() is notify()ing w/ value:",value);
Atomics.store(state.sabOPView, state.opIds[opName], value);
Atomics.notify(state.sabOPView, state.opIds[opName]);
log(opName+"() => notify(",state.rcIds[opName],",",value,")");
Atomics.store(state.sabOPView, state.rcIds[opName], value);
Atomics.notify(state.sabOPView, state.rcIds[opName]);
};
/**
@ -155,6 +159,17 @@ const mTimeStart = (op)=>{
const mTimeEnd = ()=>(
metrics[opTimer.op].time += performance.now() - opTimer.start
);
const waitTimer = Object.create(null);
waitTimer.op = undefined;
waitTimer.start = undefined;
const wTimeStart = (op)=>{
waitTimer.start = performance.now();
waitTimer.op = op;
//metrics[op] || toss("Maintenance required: missing metrics for",op);
};
const wTimeEnd = ()=>(
metrics[waitTimer.op].wait += performance.now() - waitTimer.start
);
/**
Asynchronous wrappers for sqlite3_vfs and sqlite3_io_methods
@ -163,17 +178,20 @@ const mTimeEnd = ()=>(
*/
const vfsAsyncImpls = {
mkdir: async function(dirname){
mTimeStart('mkdir');
let rc = 0;
wTimeStart('mkdir');
try {
await getDirForPath(dirname+"/filepart", true);
}catch(e){
//error("mkdir failed",filename, e.message);
rc = state.sq3Codes.SQLITE_IOERR;
}
wTimeEnd();
storeAndNotify('mkdir', rc);
mTimeEnd();
},
xAccess: async function(filename){
log("xAccess(",arguments[0],")");
mTimeStart('xAccess');
/* OPFS cannot support the full range of xAccess() queries sqlite3
calls for. We can essentially just tell if the file is
@ -187,20 +205,23 @@ const vfsAsyncImpls = {
accessible, non-0 means not accessible.
*/
let rc = 0;
wTimeStart('xAccess');
try{
const [dh, fn] = await getDirForPath(filename);
await dh.getFileHandle(fn);
}catch(e){
rc = state.sq3Codes.SQLITE_IOERR;
}
wTimeEnd();
storeAndNotify('xAccess', rc);
mTimeEnd();
},
xClose: async function(fid){
const opName = 'xClose';
mTimeStart(opName);
log(opName+"(",arguments[0],")");
const fh = __openFiles[fid];
let rc = 0;
wTimeStart('xClose');
if(fh){
delete __openFiles[fid];
if(fh.accessHandle) await fh.accessHandle.close();
@ -208,10 +229,11 @@ const vfsAsyncImpls = {
try{ await fh.dirHandle.removeEntry(fh.filenamePart) }
catch(e){ warn("Ignoring dirHandle.removeEntry() failure of",fh,e) }
}
storeAndNotify(opName, 0);
}else{
storeAndNotify(opName, state.sq3Codes.SQLITE_NOFOUND);
rc = state.sq3Codes.SQLITE_NOTFOUND;
}
wTimeEnd();
storeAndNotify(opName, rc);
mTimeEnd();
},
xDelete: async function(...args){
@ -233,12 +255,11 @@ const vfsAsyncImpls = {
presumably it will fail if the dir is not empty and that flag
is false.
*/
log("xDelete(",arguments[0],")");
let rc = 0;
wTimeStart('xDelete');
try {
while(filename){
const [hDir, filenamePart] = await getDirForPath(filename, false);
//log("Removing:",hDir, filenamePart);
if(!filenamePart) break;
await hDir.removeEntry(filenamePart, {recursive});
if(0x1234 !== syncDir) break;
@ -252,13 +273,14 @@ const vfsAsyncImpls = {
//error("Delete failed",filename, e.message);
rc = state.sq3Codes.SQLITE_IOERR_DELETE;
}
wTimeEnd();
return rc;
},
xFileSize: async function(fid){
mTimeStart('xFileSize');
log("xFileSize(",arguments,")");
const fh = __openFiles[fid];
let sz;
wTimeStart('xFileSize');
try{
sz = await fh.accessHandle.getSize();
state.s11n.serialize(Number(sz));
@ -267,15 +289,16 @@ const vfsAsyncImpls = {
error("xFileSize():",e, fh);
sz = state.sq3Codes.SQLITE_IOERR;
}
wTimeEnd();
storeAndNotify('xFileSize', sz);
mTimeEnd();
},
xOpen: async function(fid/*sqlite3_file pointer*/, filename, flags){
const opName = 'xOpen';
mTimeStart(opName);
log(opName+"(",arguments[0],")");
const deleteOnClose = (state.sq3Codes.SQLITE_OPEN_DELETEONCLOSE & flags);
const create = (state.sq3Codes.SQLITE_OPEN_CREATE & flags);
wTimeStart('xOpen');
try{
let hDir, filenamePart;
try {
@ -283,6 +306,7 @@ const vfsAsyncImpls = {
}catch(e){
storeAndNotify(opName, state.sql3Codes.SQLITE_NOTFOUND);
mTimeEnd();
wTimeEnd();
return;
}
const hFile = await hDir.getFileHandle(filenamePart, {create});
@ -294,6 +318,7 @@ const vfsAsyncImpls = {
places that limitation on it.
*/
fobj.accessHandle = await hFile.createSyncAccessHandle();
wTimeEnd();
__openFiles[fid] = fobj;
fobj.filenameAbs = filename;
fobj.filenamePart = filenamePart;
@ -304,6 +329,7 @@ const vfsAsyncImpls = {
fobj.deleteOnClose = deleteOnClose;
storeAndNotify(opName, 0);
}catch(e){
wTimeEnd();
error(opName,e);
storeAndNotify(opName, state.sq3Codes.SQLITE_IOERR);
}
@ -311,14 +337,15 @@ const vfsAsyncImpls = {
},
xRead: async function(fid,n,offset){
mTimeStart('xRead');
log("xRead(",arguments[0],")");
let rc = 0;
try{
const fh = __openFiles[fid];
wTimeStart('xRead');
const nRead = fh.accessHandle.read(
fh.sabView.subarray(0, n),
{at: Number(offset)}
);
wTimeEnd();
if(nRead < n){/* Zero-fill remaining bytes */
fh.sabView.fill(0, nRead, n);
rc = state.sq3Codes.SQLITE_IOERR_SHORT_READ;
@ -332,17 +359,20 @@ const vfsAsyncImpls = {
},
xSync: async function(fid,flags/*ignored*/){
mTimeStart('xSync');
log("xSync(",arguments[0],")");
const fh = __openFiles[fid];
if(!fh.readOnly && fh.accessHandle) await fh.accessHandle.flush();
if(!fh.readOnly && fh.accessHandle){
wTimeStart('xSync');
await fh.accessHandle.flush();
wTimeEnd();
}
storeAndNotify('xSync',0);
mTimeEnd();
},
xTruncate: async function(fid,size){
mTimeStart('xTruncate');
log("xTruncate(",arguments[0],")");
let rc = 0;
const fh = __openFiles[fid];
wTimeStart('xTruncate');
try{
affirmNotRO('xTruncate', fh);
await fh.accessHandle.truncate(size);
@ -350,13 +380,14 @@ const vfsAsyncImpls = {
error("xTruncate():",e,fh);
rc = state.sq3Codes.SQLITE_IOERR_TRUNCATE;
}
wTimeEnd();
storeAndNotify('xTruncate',rc);
mTimeEnd();
},
xWrite: async function(fid,n,offset){
mTimeStart('xWrite');
log("xWrite(",arguments[0],")");
let rc;
wTimeStart('xWrite');
try{
const fh = __openFiles[fid];
affirmNotRO('xWrite', fh);
@ -367,13 +398,14 @@ const vfsAsyncImpls = {
}catch(e){
error("xWrite():",e,fh);
rc = state.sq3Codes.SQLITE_IOERR_WRITE;
}finally{
wTimeEnd();
}
storeAndNotify('xWrite',rc);
mTimeEnd();
}
};
const initS11n = ()=>{
// Achtung: this code is 100% duplicated in the other half of this proxy!
if(state.s11n) return state.s11n;
@ -403,46 +435,69 @@ const initS11n = ()=>{
serialization for simplicy of implementation, but if that
proves imperformant then a lower-level approach will be
created.
If passed "too much data" (more that the shared buffer size
it will either throw or truncate the data (not certain
which)). This routine is only intended for serializing OPFS
VFS arguments and (in at least one special case) result
values, and the buffer is sized to be able to comfortably
handle those.
If passed no arguments then it zeroes out the serialization
state.
*/
state.s11n.serialize = function(...args){
const json = jsonEncoder.encode(JSON.stringify(args));
viewSz.setInt32(0, json.byteLength, state.littleEndian);
viewJson.set(json);
if(args.length){
const json = jsonEncoder.encode(JSON.stringify(args));
viewSz.setInt32(0, json.byteLength, state.littleEndian);
viewJson.set(json);
}else{
viewSz.setInt32(0, 0, state.littleEndian);
}
};
return state.s11n;
};
const waitLoop = function(){
const waitLoop = async function f(){
const opHandlers = Object.create(null);
for(let k of Object.keys(state.opIds)){
for(let k of Object.keys(state.rcIds)){
const o = Object.create(null);
opHandlers[state.opIds[k]] = o;
o.key = k;
o.f = vfsAsyncImpls[k];// || toss("No vfsAsyncImpls[",k,"]");
}
const sabOP = state.sabOP;
for(;;){
let metricsTimer = self.location.port>=1024 ? performance.now() : 0;
// ^^^ in dev environment, dump out these metrics one time after a delay.
while(true){
try {
Atomics.store(sabOP, state.opIds.whichOp, 0);
Atomic.wait(sabOP, state.opIds.whichOp);
const opId = Atomics.load(sabOP, state.opIds.whichOp);
if('timed-out'===Atomics.wait(state.sabOPView, state.opIds.whichOp, 0, 150)){
continue;
}
const opId = Atomics.load(state.sabOPView, state.opIds.whichOp);
Atomics.store(state.sabOPView, state.opIds.whichOp, 0);
const hnd = opHandlers[opId] ?? toss("No waitLoop handler for whichOp #",opId);
const args = state.s11n.deserialize();
log("whichOp =",opId,hnd,args);
const rc = 0/*TODO: run op*/;
Atomics.store(sabOP, state.rcIds[hnd.key], rc);
Atomics.notify(sabOP, state.rcIds[hnd.key]);
//warn("waitLoop() whichOp =",opId, hnd, args);
if(hnd.f) await hnd.f(...args);
else error("Missing callback for opId",opId);
}catch(e){
error('in waitLoop():',e.message);
}finally{
// We can't call metrics.dump() from the dev console because this
// thread is continually tied up in Atomics.wait(), so let's
// do, for dev purposes only, a dump one time after 60 seconds.
if(metricsTimer && (performance.now() > metricsTimer + 60000)){
metrics.dump();
metricsTimer = 0;
}
}
}
};
};
navigator.storage.getDirectory().then(function(d){
const wMsg = (type)=>postMessage({type});
state.rootDir = d;
log("state.rootDir =",state.rootDir);
self.onmessage = async function({data}){
log("self.onmessage()",data);
self.onmessage = function({data}){
switch(data.type){
case 'opfs-async-init':{
/* Receive shared state from synchronous partner */
@ -469,20 +524,7 @@ navigator.storage.getDirectory().then(function(d){
metrics.reset();
log("init state",state);
wMsg('opfs-async-inited');
break;
}
default:{
let err;
const m = vfsAsyncImpls[data.type] || toss("Unknown message type:",data.type);
try {
await m(...data.args).catch((e)=>err=e);
}catch(e){
err = e;
}
if(err){
error("Error handling",data.type+"():",e);
storeAndNotify(data.type, state.sq3Codes.SQLITE_ERROR);
}
waitLoop();
break;
}
}