www.spaceplanner.app

Web client to the spaceplanner API
git clone git://jacobedwards.org/www.spaceplanner.app
Log | Files | Refs

async-lock.js (7109B)


      1 const DEFAULT_TIMEOUT = 0; //Never
      2 const DEFAULT_MAX_OCCUPATION_TIME = 0; //Never
      3 const DEFAULT_MAX_EXECUTION_TIME = 0; //Never
      4 const DEFAULT_MAX_PENDING = 1000;
      5 
      6 export class AsyncLock {
      7 	constructor(opts) {
      8 		opts = opts || {};
      9 
     10 		this.Promise = opts.Promise || Promise;
     11 
     12 		// format: {key : [fn, fn]}
     13 		// queues[key] = null indicates no job running for key
     14 		this.queues = Object.create(null);
     15 
     16 		// lock is reentrant for same domain
     17 		this.domainReentrant = opts.domainReentrant || false;
     18 		if (this.domainReentrant) {
     19 			if (typeof process === 'undefined' || typeof process.domain === 'undefined') {
     20 				throw new Error(
     21 					'Domain-reentrant locks require `process.domain` to exist. Please flip `opts.domainReentrant = false`, ' +
     22 					'use a NodeJS version that still implements Domain, or install a browser polyfill.');
     23 			}
     24 			// domain of current running func {key : fn}
     25 			this.domains = Object.create(null);
     26 		}
     27 
     28 		this.timeout = opts.timeout || DEFAULT_TIMEOUT;
     29 		this.maxOccupationTime = opts.maxOccupationTime || DEFAULT_MAX_OCCUPATION_TIME;
     30 		this.maxExecutionTime = opts.maxExecutionTime || DEFAULT_MAX_EXECUTION_TIME;
     31 		if (opts.maxPending === Infinity || (Number.isInteger(opts.maxPending) && opts.maxPending >= 0)) {
     32 			this.maxPending = opts.maxPending;
     33 		} else {
     34 			this.maxPending = DEFAULT_MAX_PENDING;
     35 		}
     36 
     37 	}
     38 
     39 	/**
     40 	 * Acquire Locks
     41 	 *
     42 	 * @param {String|Array} key 	resource key or keys to lock
     43 	 * @param {function} fn 	async function
     44 	 * @param {function} cb 	callback function, otherwise will return a promise
     45 	 * @param {Object} opts 	options
     46 	 */
     47 	acquire(key, fn, cb, opts) {
     48 		if (Array.isArray(key)) {
     49 			return this._acquireBatch(key, fn, cb, opts);
     50 		}
     51 
     52 		if (typeof (fn) !== 'function') {
     53 			throw new Error('You must pass a function to execute');
     54 		}
     55 
     56 		// faux-deferred promise using new Promise() (as Promise.defer is deprecated)
     57 		var deferredResolve = null;
     58 		var deferredReject = null;
     59 		var deferred = null;
     60 
     61 		if (typeof (cb) !== 'function') {
     62 			opts = cb;
     63 			cb = null;
     64 
     65 			// will return a promise
     66 			deferred = new this.Promise(function(resolve, reject) {
     67 				deferredResolve = resolve;
     68 				deferredReject = reject;
     69 			});
     70 		}
     71 
     72 		opts = opts || {};
     73 
     74 		var resolved = false;
     75 		var timer = null;
     76 		var occupationTimer = null;
     77 		var executionTimer = null;
     78 		var self = this;
     79 
     80 		var done = function (locked, err, ret) {
     81 
     82 			if (occupationTimer) {
     83 				clearTimeout(occupationTimer);
     84 				occupationTimer = null;
     85 			}
     86 
     87 			if (executionTimer) {
     88 				clearTimeout(executionTimer);
     89 				executionTimer = null;
     90 			}
     91 
     92 			if (locked) {
     93 				if (!!self.queues[key] && self.queues[key].length === 0) {
     94 					delete self.queues[key];
     95 				}
     96 				if (self.domainReentrant) {
     97 					delete self.domains[key];
     98 				}
     99 			}
    100 
    101 			if (!resolved) {
    102 				if (!deferred) {
    103 					if (typeof (cb) === 'function') {
    104 						cb(err, ret);
    105 					}
    106 				}
    107 				else {
    108 					//promise mode
    109 					if (err) {
    110 						deferredReject(err);
    111 					}
    112 					else {
    113 						deferredResolve(ret);
    114 					}
    115 				}
    116 				resolved = true;
    117 			}
    118 
    119 			if (locked) {
    120 				//run next func
    121 				if (!!self.queues[key] && self.queues[key].length > 0) {
    122 					self.queues[key].shift()();
    123 				}
    124 			}
    125 		};
    126 
    127 		var exec = function (locked) {
    128 			if (resolved) { // may due to timed out
    129 				return done(locked);
    130 			}
    131 
    132 			if (timer) {
    133 				clearTimeout(timer);
    134 				timer = null;
    135 			}
    136 
    137 			if (self.domainReentrant && locked) {
    138 				self.domains[key] = process.domain;
    139 			}
    140 
    141 			var maxExecutionTime = opts.maxExecutionTime || self.maxExecutionTime;
    142 			if (maxExecutionTime) {
    143 				executionTimer = setTimeout(function () {
    144 					if (!!self.queues[key]) {
    145 						done(locked, new Error('Maximum execution time is exceeded ' + key));
    146 					}
    147 				}, maxExecutionTime);
    148 			}
    149 
    150 			// Callback mode
    151 			if (fn.length === 1) {
    152 				var called = false;
    153 				try {
    154 					fn(function (err, ret) {
    155 						if (!called) {
    156 							called = true;
    157 							done(locked, err, ret);
    158 						}
    159 					});
    160 				} catch (err) {
    161 					// catching error thrown in user function fn
    162 					if (!called) {
    163 						called = true;
    164 						done(locked, err);
    165 					}
    166 				}
    167 			}
    168 			else {
    169 				// Promise mode
    170 				self._promiseTry(function () {
    171 					return fn();
    172 				})
    173 				.then(function(ret){
    174 					done(locked, undefined, ret);
    175 				}, function(error){
    176 					done(locked, error);
    177 				});
    178 			}
    179 		};
    180 
    181 		if (self.domainReentrant && !!process.domain) {
    182 			exec = process.domain.bind(exec);
    183 		}
    184 
    185 		var maxPending = opts.maxPending || self.maxPending;
    186 
    187 		if (!self.queues[key]) {
    188 			self.queues[key] = [];
    189 			exec(true);
    190 		}
    191 		else if (self.domainReentrant && !!process.domain && process.domain === self.domains[key]) {
    192 			// If code is in the same domain of current running task, run it directly
    193 			// Since lock is re-enterable
    194 			exec(false);
    195 		}
    196 		else if (self.queues[key].length >= maxPending) {
    197 			done(false, new Error('Too many pending tasks in queue ' + key));
    198 		}
    199 		else {
    200 			var taskFn = function () {
    201 				exec(true);
    202 			};
    203 			if (opts.skipQueue) {
    204 				self.queues[key].unshift(taskFn);
    205 			} else {
    206 				self.queues[key].push(taskFn);
    207 			}
    208 
    209 			var timeout = opts.timeout || self.timeout;
    210 			if (timeout) {
    211 				timer = setTimeout(function () {
    212 					timer = null;
    213 					done(false, new Error('async-lock timed out in queue ' + key));
    214 				}, timeout);
    215 			}
    216 		}
    217 
    218 		var maxOccupationTime = opts.maxOccupationTime || self.maxOccupationTime;
    219 			if (maxOccupationTime) {
    220 				occupationTimer = setTimeout(function () {
    221 					if (!!self.queues[key]) {
    222 						done(false, new Error('Maximum occupation time is exceeded in queue ' + key));
    223 					}
    224 				}, maxOccupationTime);
    225 			}
    226 
    227 		if (deferred) {
    228 			return deferred;
    229 		}
    230 	};
    231 
    232 	/*
    233 	 * Below is how this function works:
    234 	 *
    235 	 * Equivalent code:
    236 	 * self.acquire(key1, function(cb){
    237 	 *     self.acquire(key2, function(cb){
    238 	 *         self.acquire(key3, fn, cb);
    239 	 *     }, cb);
    240 	 * }, cb);
    241 	 *
    242 	 * Equivalent code:
    243 	 * var fn3 = getFn(key3, fn);
    244 	 * var fn2 = getFn(key2, fn3);
    245 	 * var fn1 = getFn(key1, fn2);
    246 	 * fn1(cb);
    247 	 */
    248 	_acquireBatch(keys, fn, cb, opts) {
    249 		if (typeof (cb) !== 'function') {
    250 			opts = cb;
    251 			cb = null;
    252 		}
    253 
    254 		var self = this;
    255 		var getFn = function (key, fn) {
    256 			return function (cb) {
    257 				self.acquire(key, fn, cb, opts);
    258 			};
    259 		};
    260 
    261 		var fnx = keys.reduceRight(function (prev, key) {
    262 			return getFn(key, prev);
    263 		}, fn);
    264 
    265 		if (typeof (cb) === 'function') {
    266 			fnx(cb);
    267 		}
    268 		else {
    269 			return new this.Promise(function (resolve, reject) {
    270 				// check for promise mode in case keys is empty array
    271 				if (fnx.length === 1) {
    272 					fnx(function (err, ret) {
    273 						if (err) {
    274 							reject(err);
    275 						}
    276 						else {
    277 							resolve(ret);
    278 						}
    279 					});
    280 				} else {
    281 					resolve(fnx());
    282 				}
    283 			});
    284 		}
    285 	};
    286 
    287 	/*
    288 	 *	Whether there is any running or pending asyncFunc
    289 	 *
    290 	 *	@param {String} key
    291 	 */
    292 	isBusy(key) {
    293 		if (!key) {
    294 			return Object.keys(this.queues).length > 0;
    295 		}
    296 		else {
    297 			return !!this.queues[key];
    298 		}
    299 	};
    300 
    301 	/**
    302 	 * Promise.try() implementation to become independent of Q-specific methods
    303 	 */
    304 	_promiseTry(fn) {
    305 		try {
    306 			return this.Promise.resolve(fn());
    307 		} catch (e) {
    308 			return this.Promise.reject(e);
    309 		}
    310 	};
    311 }