async-lock.js (7109B)
1 const DEFAULT_TIMEOUT = 0; //Never 2 const DEFAULT_MAX_OCCUPATION_TIME = 0; //Never 3 const DEFAULT_MAX_EXECUTION_TIME = 0; //Never 4 const DEFAULT_MAX_PENDING = 1000; 5 6 export class AsyncLock { 7 constructor(opts) { 8 opts = opts || {}; 9 10 this.Promise = opts.Promise || Promise; 11 12 // format: {key : [fn, fn]} 13 // queues[key] = null indicates no job running for key 14 this.queues = Object.create(null); 15 16 // lock is reentrant for same domain 17 this.domainReentrant = opts.domainReentrant || false; 18 if (this.domainReentrant) { 19 if (typeof process === 'undefined' || typeof process.domain === 'undefined') { 20 throw new Error( 21 'Domain-reentrant locks require `process.domain` to exist. Please flip `opts.domainReentrant = false`, ' + 22 'use a NodeJS version that still implements Domain, or install a browser polyfill.'); 23 } 24 // domain of current running func {key : fn} 25 this.domains = Object.create(null); 26 } 27 28 this.timeout = opts.timeout || DEFAULT_TIMEOUT; 29 this.maxOccupationTime = opts.maxOccupationTime || DEFAULT_MAX_OCCUPATION_TIME; 30 this.maxExecutionTime = opts.maxExecutionTime || DEFAULT_MAX_EXECUTION_TIME; 31 if (opts.maxPending === Infinity || (Number.isInteger(opts.maxPending) && opts.maxPending >= 0)) { 32 this.maxPending = opts.maxPending; 33 } else { 34 this.maxPending = DEFAULT_MAX_PENDING; 35 } 36 37 } 38 39 /** 40 * Acquire Locks 41 * 42 * @param {String|Array} key resource key or keys to lock 43 * @param {function} fn async function 44 * @param {function} cb callback function, otherwise will return a promise 45 * @param {Object} opts options 46 */ 47 acquire(key, fn, cb, opts) { 48 if (Array.isArray(key)) { 49 return this._acquireBatch(key, fn, cb, opts); 50 } 51 52 if (typeof (fn) !== 'function') { 53 throw new Error('You must pass a function to execute'); 54 } 55 56 // faux-deferred promise using new Promise() (as Promise.defer is deprecated) 57 var deferredResolve = null; 58 var deferredReject = null; 59 var deferred = null; 60 61 if (typeof (cb) !== 'function') { 62 opts = cb; 63 cb = null; 64 65 // will return a promise 66 deferred = new this.Promise(function(resolve, reject) { 67 deferredResolve = resolve; 68 deferredReject = reject; 69 }); 70 } 71 72 opts = opts || {}; 73 74 var resolved = false; 75 var timer = null; 76 var occupationTimer = null; 77 var executionTimer = null; 78 var self = this; 79 80 var done = function (locked, err, ret) { 81 82 if (occupationTimer) { 83 clearTimeout(occupationTimer); 84 occupationTimer = null; 85 } 86 87 if (executionTimer) { 88 clearTimeout(executionTimer); 89 executionTimer = null; 90 } 91 92 if (locked) { 93 if (!!self.queues[key] && self.queues[key].length === 0) { 94 delete self.queues[key]; 95 } 96 if (self.domainReentrant) { 97 delete self.domains[key]; 98 } 99 } 100 101 if (!resolved) { 102 if (!deferred) { 103 if (typeof (cb) === 'function') { 104 cb(err, ret); 105 } 106 } 107 else { 108 //promise mode 109 if (err) { 110 deferredReject(err); 111 } 112 else { 113 deferredResolve(ret); 114 } 115 } 116 resolved = true; 117 } 118 119 if (locked) { 120 //run next func 121 if (!!self.queues[key] && self.queues[key].length > 0) { 122 self.queues[key].shift()(); 123 } 124 } 125 }; 126 127 var exec = function (locked) { 128 if (resolved) { // may due to timed out 129 return done(locked); 130 } 131 132 if (timer) { 133 clearTimeout(timer); 134 timer = null; 135 } 136 137 if (self.domainReentrant && locked) { 138 self.domains[key] = process.domain; 139 } 140 141 var maxExecutionTime = opts.maxExecutionTime || self.maxExecutionTime; 142 if (maxExecutionTime) { 143 executionTimer = setTimeout(function () { 144 if (!!self.queues[key]) { 145 done(locked, new Error('Maximum execution time is exceeded ' + key)); 146 } 147 }, maxExecutionTime); 148 } 149 150 // Callback mode 151 if (fn.length === 1) { 152 var called = false; 153 try { 154 fn(function (err, ret) { 155 if (!called) { 156 called = true; 157 done(locked, err, ret); 158 } 159 }); 160 } catch (err) { 161 // catching error thrown in user function fn 162 if (!called) { 163 called = true; 164 done(locked, err); 165 } 166 } 167 } 168 else { 169 // Promise mode 170 self._promiseTry(function () { 171 return fn(); 172 }) 173 .then(function(ret){ 174 done(locked, undefined, ret); 175 }, function(error){ 176 done(locked, error); 177 }); 178 } 179 }; 180 181 if (self.domainReentrant && !!process.domain) { 182 exec = process.domain.bind(exec); 183 } 184 185 var maxPending = opts.maxPending || self.maxPending; 186 187 if (!self.queues[key]) { 188 self.queues[key] = []; 189 exec(true); 190 } 191 else if (self.domainReentrant && !!process.domain && process.domain === self.domains[key]) { 192 // If code is in the same domain of current running task, run it directly 193 // Since lock is re-enterable 194 exec(false); 195 } 196 else if (self.queues[key].length >= maxPending) { 197 done(false, new Error('Too many pending tasks in queue ' + key)); 198 } 199 else { 200 var taskFn = function () { 201 exec(true); 202 }; 203 if (opts.skipQueue) { 204 self.queues[key].unshift(taskFn); 205 } else { 206 self.queues[key].push(taskFn); 207 } 208 209 var timeout = opts.timeout || self.timeout; 210 if (timeout) { 211 timer = setTimeout(function () { 212 timer = null; 213 done(false, new Error('async-lock timed out in queue ' + key)); 214 }, timeout); 215 } 216 } 217 218 var maxOccupationTime = opts.maxOccupationTime || self.maxOccupationTime; 219 if (maxOccupationTime) { 220 occupationTimer = setTimeout(function () { 221 if (!!self.queues[key]) { 222 done(false, new Error('Maximum occupation time is exceeded in queue ' + key)); 223 } 224 }, maxOccupationTime); 225 } 226 227 if (deferred) { 228 return deferred; 229 } 230 }; 231 232 /* 233 * Below is how this function works: 234 * 235 * Equivalent code: 236 * self.acquire(key1, function(cb){ 237 * self.acquire(key2, function(cb){ 238 * self.acquire(key3, fn, cb); 239 * }, cb); 240 * }, cb); 241 * 242 * Equivalent code: 243 * var fn3 = getFn(key3, fn); 244 * var fn2 = getFn(key2, fn3); 245 * var fn1 = getFn(key1, fn2); 246 * fn1(cb); 247 */ 248 _acquireBatch(keys, fn, cb, opts) { 249 if (typeof (cb) !== 'function') { 250 opts = cb; 251 cb = null; 252 } 253 254 var self = this; 255 var getFn = function (key, fn) { 256 return function (cb) { 257 self.acquire(key, fn, cb, opts); 258 }; 259 }; 260 261 var fnx = keys.reduceRight(function (prev, key) { 262 return getFn(key, prev); 263 }, fn); 264 265 if (typeof (cb) === 'function') { 266 fnx(cb); 267 } 268 else { 269 return new this.Promise(function (resolve, reject) { 270 // check for promise mode in case keys is empty array 271 if (fnx.length === 1) { 272 fnx(function (err, ret) { 273 if (err) { 274 reject(err); 275 } 276 else { 277 resolve(ret); 278 } 279 }); 280 } else { 281 resolve(fnx()); 282 } 283 }); 284 } 285 }; 286 287 /* 288 * Whether there is any running or pending asyncFunc 289 * 290 * @param {String} key 291 */ 292 isBusy(key) { 293 if (!key) { 294 return Object.keys(this.queues).length > 0; 295 } 296 else { 297 return !!this.queues[key]; 298 } 299 }; 300 301 /** 302 * Promise.try() implementation to become independent of Q-specific methods 303 */ 304 _promiseTry(fn) { 305 try { 306 return this.Promise.resolve(fn()); 307 } catch (e) { 308 return this.Promise.reject(e); 309 } 310 }; 311 }