下面通过3篇讲解promise.all并发限制
Promise.all并发限制
背景
通常,我们在需要保证代码在多个异步处理之后执行,会用到:
Promise.all(promises: []).then(fun: function);
Promise.all可以保证,promises数组中所有promise对象都达到resolve状态,才执行then回调。
这时候考虑一个场景:如果你的promises数组中每个对象都是http请求,或者说每个对象包含了复杂的调用处理。而这样的对象有几十万个。
那么会出现的情况是,你在瞬间发出几十万http请求(tcp连接数不足可能造成等待),或者堆积了无数调用栈导致内存溢出。
这时候,我们就需要考虑对Promise.all做并发限制。
Promise.all并发限制指的是,每个时刻并发执行的promise数量是固定的,最终的执行结果还是保持与原来的Promise.all一致。
实现
我们知道,promise并不是因为调用Promise.all才执行,而是在实例化promise对象的时候就执行了,在理解这一点的基础上,要实现并发限制,只能从promise实例化上下手。
换句话说,就是把生成promises数组的控制权,交给并发控制逻辑。
这里我并不打算一步步实现这个这个功能,npm中有很多实现这个功能的第三方包,比如async-pool、es6-promise-pool、p-limit,这里我直接拿async-pool的代码来分析一下实现原理。
代码很简单,去掉不必要的代码,加上自己的注释,大概内容如下:
function asyncPool(poolLimit, array, iteratorFn) { let i = 0; const ret = []; const executing = []; const enqueue = function () { // 边界处理,array为空数组 if (i === array.length) { return Promise.resolve(); } // 每调一次enqueue,初始化一个promise const item = array[i++]; const p = Promise.resolve().then(() => iteratorFn(item, array)); // 放入promises数组 ret.push(p); // promise执行完毕,从executing数组中删除 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); // 插入executing数字,表示正在执行的promise executing.push(e); // 使用Promise.rece,每当executing数组中promise数量低于poolLimit,就实例化新的promise并执行 let r = Promise.resolve(); if (executing.length >= poolLimit) { r = Promise.race(executing); } // 递归,直到遍历完array return r.then(() => enqueue()); }; return enqueue().then(() => Promise.all(ret)); }
因为是promise加上递归,所以在代码注释上不太好标注执行顺序,但是大概的逻辑可以总结为:
- 1.从array第1个元素开始,初始化promise对象,同时用一个executing数组保存正在执行的promise
- 2.不断初始化promise,直到达到poolLimt
- 3.使用Promise.race,获得executing中promise的执行情况,当有一个promise执行完毕,继续初始化promise并放入executing中
- 4.所有promise都执行完了,调用Promise.all返回
使用方式就是:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => { ... });
面试题:
- 有 8 个图片资源的 url,已经存储在数组 urls 中(即urls = [‘http://example.com/1.jpg’, …., ‘http://example.com/8.jpg’]),而且已经有一个函数 function loadImg,输入一个 url 链接,返回一个 Promise,该 Promise 在图片下载完成的时候 resolve,下载失败则 reject。但是我们要求,任意时刻,同时下载的链接数量不可以超过 3 个。请写一段代码实现这个需求,要求尽可能快速地将所有图片下载完成。
- 已有代码如下:
var urls = [ 'https://www.kkkk1000.com/images/getImgData/getImgDatadata.jpg', 'https://www.kkkk1000.com/images/getImgData/gray.gif', 'https://www.kkkk1000.com/images/getImgData/Particle.gif', 'https://www.kkkk1000.com/images/getImgData/arithmetic.png', 'https://www.kkkk1000.com/images/getImgData/arithmetic2.gif', 'https://www.kkkk1000.com/images/getImgData/getImgDataError.jpg', 'https://www.kkkk1000.com/images/getImgData/arithmetic.gif', 'https://www.kkkk1000.com/images/wxQrCode2.png' ]; function loadImg(url) { return new Promise((resolve, reject) => { const img = new Image() img.onload = function () { console.log('一张图片加载完成'); resolve(); } img.onerror = reject img.src = url }) };
思路:
用 Promise.race来实现,先并发请求3个图片资源,这样可以得到 3 个 Promise实例,组成一个数组promises ,然后不断的调用 Promise.race 来返回最快改变状态的 Promise,然后从数组(promises )中删掉这个 Promise 对象实例,再加入一个新的 Promise实例,直到全部的 url 被取完。
//省略代码 function limitLoad(urls, handler, limit) { // 对数组做一个拷贝 const sequence = [].concat(urls) let promises = []; //并发请求到最大数 promises = sequence.splice(0, limit).map((url, index) => { // 这里返回的 index 是任务在 promises 的脚标, //用于在 Promise.race 之后找到完成的任务脚标 return handler(url).then(() => { return index }); }); (async function loop() { let p = Promise.race(promises); for (let i = 0; i < sequence.length; i++) { p = p.then((res) => { promises[res] = handler(sequence[i]).then(() => { return res }); return Promise.race(promises) }) } })() } limitLoad(urls, loadImg, 3)
参考