commit 28e6c2a23d63e46a9af8b6695f94aa57469570fa
parent c92508b66b34ccf0f94e7d5a030c6686fb211b5b
Author: Jacob R. Edwards <jacob@jacobedwards.org>
Date: Tue, 22 Oct 2024 08:45:25 -0700
Use async-lock to lock server interactions
This may have been one of the big issues with duplicate IDs, as two
requests made at the same time would create two remote objects for
the same local object, and be returned two IDs pointing to two local
IDs.
Also updated updateIDs.
Diffstat:
2 files changed, 387 insertions(+), 61 deletions(-)
diff --git a/files/floorplans/floorplan/backend.js b/files/floorplans/floorplan/backend.js
@@ -1,4 +1,5 @@
import * as api from "/lib/api.js"
+import { AsyncLock } from "/lib/async-lock.js"
// Sequence numbers for uniqueKey
let sequences = {}
@@ -319,6 +320,8 @@ export class FloorplanBackend {
}
this.floorplan = floorplan
+ this.lock = new AsyncLock();
+
if (options.callbacks) {
this.callbacks = options.callbacks
}
@@ -731,48 +734,51 @@ export class FloorplanBackend {
return this.putServer()
}
- let newpos = this.history.place
- let dirty = this.history.between(this.serverPosition, newpos)
- if (dirty.length === 0) {
- console.log("Not updating server: already up to date")
- return Promise.resolve()
- }
+ let backend = this
+ const locked = function() {
+ let newpos = backend.history.place
+ let dirty = backend.history.between(backend.serverPosition, newpos)
+ if (dirty.length === 0) {
+ console.log("Not updating server: already up to date")
+ return Promise.resolve()
+ }
- let patch = []
- for (let i in dirty) {
- let op = dirty[i].op
- let id = parsePath(dirty[i].path)
- let value = dirty[i].value ? this.remapIDsValue(dirty[i].value, this.serverIDs) : null
- if (op === "new" || this.serverIDs[id] == null) {
- patch.push({ op: op, path: dirty[i].path, value: value })
- } else {
- patch.push({ op: op, path: idPath(this.serverIDs[id]), value })
+ let patch = []
+ for (let i in dirty) {
+ let op = dirty[i].op
+ let id = parsePath(dirty[i].path)
+ let value = dirty[i].value ? backend.remapIDsValue(dirty[i].value, backend.serverIDs) : null
+ if (op === "new" || backend.serverIDs[id] == null) {
+ patch.push({ op: "new", path: dirty[i].path, value: value })
+ } else {
+ patch.push({ op: op, path: idPath(backend.serverIDs[id]), value })
+ }
}
- }
- console.debug("Backend.push (patch)", patch)
+ console.debug("Backend.push (patch)", patch)
- let backend = this
- return api.fetch("PATCH", this.endpoint, patch)
- .then(function(data) {
- for (let i = 0; i < patch.length; ++i) {
- if (patch[i].op === "remove") {
- let id = parsePath(patch[i].path)
- backend.unmapID(id)
+ return api.fetch("PATCH", backend.endpoint, patch)
+ .then(function(data) {
+ for (let i = 0; i < patch.length; ++i) {
+ if (patch[i].op === "remove") {
+ let id = parsePath(patch[i].path)
+ backend.unmapID(id)
+ }
}
- }
- backend.serverPosition = newpos
- updateIDs(backend, data)
- for (let i in dirty) {
- delete dirty[i].dirty
- }
- backend.cb("push")
- })
- .catch(function(err) {
- console.error("Unable to PATCH floorplan, trying PUT", err)
- return backend.putServer()
- })
+ backend.serverPosition = newpos
+ updateIDs(backend, data)
+ for (let i in dirty) {
+ delete dirty[i].dirty
+ }
+ backend.cb("push")
+ })
+ .catch(function(err) {
+ console.error("Unable to PATCH floorplan, trying PUT", err)
+ return backend.putServer()
+ })
+ }
+ return this.lock.acquire("data", locked)
}
putServer() {
@@ -783,21 +789,28 @@ export class FloorplanBackend {
// WARNING: This needs a lock
let backend = this
- return api.fetch("PUT", this.endpoint, this.cache)
- .then(function(data) {
- for (let k in backend.serverIDs) {
- if (backend.serverIDs[k] !== null) {
- backend.unmapID(backend.serverIDs[k])
+ return this.lock.acquire("data", function() {
+ return api.fetch("PUT", backend.endpoint, backend.cache)
+ .then(function(data) {
+ for (let k in backend.serverIDs) {
+ if (backend.serverIDs[k] !== null) {
+ backend.unmapID(backend.serverIDs[k])
+ }
}
- }
- updateIDs(backend, data)
- backend.serverPosition = backend.history.place
- backend.cb("push")
- })
- .catch(function(err) {
- backend.cb("pusherror", err)
- throw err
- })
+ updateIDs(backend, data)
+ backend.serverPosition = backend.history.place
+ backend.cb("push")
+ })
+ .catch(function(err) {
+ if (!(err instanceof api.FetchError)) {
+ console.error(err, "Not a fetch error; undoing and trying again")
+ backend.undo()
+ return backend.push()
+ }
+ backend.cb("pusherror", err)
+ throw err
+ })
+ })
}
/*
@@ -818,15 +831,17 @@ export class FloorplanBackend {
}
let backend = this
- return api.fetch("GET", this.endpoint)
- .then(function(data) {
- data = backend.toLocalIDs(data)
- let diff = gendiff("", backend.cache, data)
- console.debug("Backend.Pull (diff)", diff)
- backend.applyDiff(diff, { clean: true })
- backend.cb("pull")
- backend.serverPosition = backend.history.place
- })
+ return this.lock.acquire("data", function() {
+ return api.fetch("GET", backend.endpoint)
+ .then(function(data) {
+ data = backend.toLocalIDs(data)
+ let diff = gendiff("", backend.cache, data)
+ console.debug("Backend.Pull (diff)", diff)
+ backend.applyDiff(diff, { clean: true })
+ backend.cb("pull")
+ backend.serverPosition = backend.history.place
+ })
+ })
}
applyDiff(diff, options) {
@@ -1073,8 +1088,8 @@ function updateIDs(backend, newdata) {
let x = newdata[t][srvID]
if (x.old_id != null) {
backend.remapID(x.old_id, srvID)
- } else {
- backend.remapID(srvID, srvID)
+ } else if (!backend.localIDs[srvID]) {
+ backend.mapID(backend.newID(idType(srvID)), srvID)
}
}
}
diff --git a/files/lib/async-lock.js b/files/lib/async-lock.js
@@ -0,0 +1,311 @@
+const DEFAULT_TIMEOUT = 0; //Never
+const DEFAULT_MAX_OCCUPATION_TIME = 0; //Never
+const DEFAULT_MAX_EXECUTION_TIME = 0; //Never
+const DEFAULT_MAX_PENDING = 1000;
+
+export class AsyncLock {
+ constructor(opts) {
+ opts = opts || {};
+
+ this.Promise = opts.Promise || Promise;
+
+ // format: {key : [fn, fn]}
+ // queues[key] = null indicates no job running for key
+ this.queues = Object.create(null);
+
+ // lock is reentrant for same domain
+ this.domainReentrant = opts.domainReentrant || false;
+ if (this.domainReentrant) {
+ if (typeof process === 'undefined' || typeof process.domain === 'undefined') {
+ throw new Error(
+ 'Domain-reentrant locks require `process.domain` to exist. Please flip `opts.domainReentrant = false`, ' +
+ 'use a NodeJS version that still implements Domain, or install a browser polyfill.');
+ }
+ // domain of current running func {key : fn}
+ this.domains = Object.create(null);
+ }
+
+ this.timeout = opts.timeout || DEFAULT_TIMEOUT;
+ this.maxOccupationTime = opts.maxOccupationTime || DEFAULT_MAX_OCCUPATION_TIME;
+ this.maxExecutionTime = opts.maxExecutionTime || DEFAULT_MAX_EXECUTION_TIME;
+ if (opts.maxPending === Infinity || (Number.isInteger(opts.maxPending) && opts.maxPending >= 0)) {
+ this.maxPending = opts.maxPending;
+ } else {
+ this.maxPending = DEFAULT_MAX_PENDING;
+ }
+
+ }
+
+ /**
+ * Acquire Locks
+ *
+ * @param {String|Array} key resource key or keys to lock
+ * @param {function} fn async function
+ * @param {function} cb callback function, otherwise will return a promise
+ * @param {Object} opts options
+ */
+ acquire(key, fn, cb, opts) {
+ if (Array.isArray(key)) {
+ return this._acquireBatch(key, fn, cb, opts);
+ }
+
+ if (typeof (fn) !== 'function') {
+ throw new Error('You must pass a function to execute');
+ }
+
+ // faux-deferred promise using new Promise() (as Promise.defer is deprecated)
+ var deferredResolve = null;
+ var deferredReject = null;
+ var deferred = null;
+
+ if (typeof (cb) !== 'function') {
+ opts = cb;
+ cb = null;
+
+ // will return a promise
+ deferred = new this.Promise(function(resolve, reject) {
+ deferredResolve = resolve;
+ deferredReject = reject;
+ });
+ }
+
+ opts = opts || {};
+
+ var resolved = false;
+ var timer = null;
+ var occupationTimer = null;
+ var executionTimer = null;
+ var self = this;
+
+ var done = function (locked, err, ret) {
+
+ if (occupationTimer) {
+ clearTimeout(occupationTimer);
+ occupationTimer = null;
+ }
+
+ if (executionTimer) {
+ clearTimeout(executionTimer);
+ executionTimer = null;
+ }
+
+ if (locked) {
+ if (!!self.queues[key] && self.queues[key].length === 0) {
+ delete self.queues[key];
+ }
+ if (self.domainReentrant) {
+ delete self.domains[key];
+ }
+ }
+
+ if (!resolved) {
+ if (!deferred) {
+ if (typeof (cb) === 'function') {
+ cb(err, ret);
+ }
+ }
+ else {
+ //promise mode
+ if (err) {
+ deferredReject(err);
+ }
+ else {
+ deferredResolve(ret);
+ }
+ }
+ resolved = true;
+ }
+
+ if (locked) {
+ //run next func
+ if (!!self.queues[key] && self.queues[key].length > 0) {
+ self.queues[key].shift()();
+ }
+ }
+ };
+
+ var exec = function (locked) {
+ if (resolved) { // may due to timed out
+ return done(locked);
+ }
+
+ if (timer) {
+ clearTimeout(timer);
+ timer = null;
+ }
+
+ if (self.domainReentrant && locked) {
+ self.domains[key] = process.domain;
+ }
+
+ var maxExecutionTime = opts.maxExecutionTime || self.maxExecutionTime;
+ if (maxExecutionTime) {
+ executionTimer = setTimeout(function () {
+ if (!!self.queues[key]) {
+ done(locked, new Error('Maximum execution time is exceeded ' + key));
+ }
+ }, maxExecutionTime);
+ }
+
+ // Callback mode
+ if (fn.length === 1) {
+ var called = false;
+ try {
+ fn(function (err, ret) {
+ if (!called) {
+ called = true;
+ done(locked, err, ret);
+ }
+ });
+ } catch (err) {
+ // catching error thrown in user function fn
+ if (!called) {
+ called = true;
+ done(locked, err);
+ }
+ }
+ }
+ else {
+ // Promise mode
+ self._promiseTry(function () {
+ return fn();
+ })
+ .then(function(ret){
+ done(locked, undefined, ret);
+ }, function(error){
+ done(locked, error);
+ });
+ }
+ };
+
+ if (self.domainReentrant && !!process.domain) {
+ exec = process.domain.bind(exec);
+ }
+
+ var maxPending = opts.maxPending || self.maxPending;
+
+ if (!self.queues[key]) {
+ self.queues[key] = [];
+ exec(true);
+ }
+ else if (self.domainReentrant && !!process.domain && process.domain === self.domains[key]) {
+ // If code is in the same domain of current running task, run it directly
+ // Since lock is re-enterable
+ exec(false);
+ }
+ else if (self.queues[key].length >= maxPending) {
+ done(false, new Error('Too many pending tasks in queue ' + key));
+ }
+ else {
+ var taskFn = function () {
+ exec(true);
+ };
+ if (opts.skipQueue) {
+ self.queues[key].unshift(taskFn);
+ } else {
+ self.queues[key].push(taskFn);
+ }
+
+ var timeout = opts.timeout || self.timeout;
+ if (timeout) {
+ timer = setTimeout(function () {
+ timer = null;
+ done(false, new Error('async-lock timed out in queue ' + key));
+ }, timeout);
+ }
+ }
+
+ var maxOccupationTime = opts.maxOccupationTime || self.maxOccupationTime;
+ if (maxOccupationTime) {
+ occupationTimer = setTimeout(function () {
+ if (!!self.queues[key]) {
+ done(false, new Error('Maximum occupation time is exceeded in queue ' + key));
+ }
+ }, maxOccupationTime);
+ }
+
+ if (deferred) {
+ return deferred;
+ }
+ };
+
+ /*
+ * Below is how this function works:
+ *
+ * Equivalent code:
+ * self.acquire(key1, function(cb){
+ * self.acquire(key2, function(cb){
+ * self.acquire(key3, fn, cb);
+ * }, cb);
+ * }, cb);
+ *
+ * Equivalent code:
+ * var fn3 = getFn(key3, fn);
+ * var fn2 = getFn(key2, fn3);
+ * var fn1 = getFn(key1, fn2);
+ * fn1(cb);
+ */
+ _acquireBatch(keys, fn, cb, opts) {
+ if (typeof (cb) !== 'function') {
+ opts = cb;
+ cb = null;
+ }
+
+ var self = this;
+ var getFn = function (key, fn) {
+ return function (cb) {
+ self.acquire(key, fn, cb, opts);
+ };
+ };
+
+ var fnx = keys.reduceRight(function (prev, key) {
+ return getFn(key, prev);
+ }, fn);
+
+ if (typeof (cb) === 'function') {
+ fnx(cb);
+ }
+ else {
+ return new this.Promise(function (resolve, reject) {
+ // check for promise mode in case keys is empty array
+ if (fnx.length === 1) {
+ fnx(function (err, ret) {
+ if (err) {
+ reject(err);
+ }
+ else {
+ resolve(ret);
+ }
+ });
+ } else {
+ resolve(fnx());
+ }
+ });
+ }
+ };
+
+ /*
+ * Whether there is any running or pending asyncFunc
+ *
+ * @param {String} key
+ */
+ isBusy(key) {
+ if (!key) {
+ return Object.keys(this.queues).length > 0;
+ }
+ else {
+ return !!this.queues[key];
+ }
+ };
+
+ /**
+ * Promise.try() implementation to become independent of Q-specific methods
+ */
+ _promiseTry(fn) {
+ try {
+ return this.Promise.resolve(fn());
+ } catch (e) {
+ return this.Promise.reject(e);
+ }
+ };
+}