promise.all并发限制

下面通过3篇讲解promise.all并发限制

Promise.all并发限制

背景

  • 通常,我们在需要保证代码在多个异步处理之后执行,会用到:

      Promise.all(promises: []).then(fun: function);
  • Promise.all可以保证,promises数组中所有promise对象都达到resolve状态,才执行then回调。

  • 这时候考虑一个场景:如果你的promises数组中每个对象都是http请求,或者说每个对象包含了复杂的调用处理。而这样的对象有几十万个。

  • 那么会出现的情况是,你在瞬间发出几十万http请求(tcp连接数不足可能造成等待),或者堆积了无数调用栈导致内存溢出。

  • 这时候,我们就需要考虑对Promise.all做并发限制。

  • Promise.all并发限制指的是,每个时刻并发执行的promise数量是固定的,最终的执行结果还是保持与原来的Promise.all一致。

实现

  • 我们知道,promise并不是因为调用Promise.all才执行,而是在实例化promise对象的时候就执行了,在理解这一点的基础上,要实现并发限制,只能从promise实例化上下手。

  • 换句话说,就是把生成promises数组的控制权,交给并发控制逻辑。

  • 这里我并不打算一步步实现这个这个功能,npm中有很多实现这个功能的第三方包,比如async-pool、es6-promise-pool、p-limit,这里我直接拿async-pool的代码来分析一下实现原理。

  • 代码很简单,去掉不必要的代码,加上自己的注释,大概内容如下:

      function asyncPool(poolLimit, array, iteratorFn) {
          let i = 0;
          const ret = [];
          const executing = [];
          const enqueue = function () {
              // 边界处理,array为空数组
              if (i === array.length) {
                  return Promise.resolve();
              }
              // 每调一次enqueue,初始化一个promise
              const item = array[i++];
              const p = Promise.resolve().then(() => iteratorFn(item, array));
              // 放入promises数组
              ret.push(p);
              // promise执行完毕,从executing数组中删除
              const e = p.then(() => executing.splice(executing.indexOf(e), 1));
              // 插入executing数字,表示正在执行的promise
              executing.push(e);
              // 使用Promise.rece,每当executing数组中promise数量低于poolLimit,就实例化新的promise并执行
              let r = Promise.resolve();
              if (executing.length >= poolLimit) {
                  r = Promise.race(executing);
              }
              // 递归,直到遍历完array
              return r.then(() => enqueue());
          };
          return enqueue().then(() => Promise.all(ret));
      }
  • 因为是promise加上递归,所以在代码注释上不太好标注执行顺序,但是大概的逻辑可以总结为:

    • 1.从array第1个元素开始,初始化promise对象,同时用一个executing数组保存正在执行的promise
    • 2.不断初始化promise,直到达到poolLimt
    • 3.使用Promise.race,获得executing中promise的执行情况,当有一个promise执行完毕,继续初始化promise并放入executing中
    • 4.所有promise都执行完了,调用Promise.all返回
  • 使用方式就是:

      const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
      return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {
          ...
      });

面试题:

  • 有 8 个图片资源的 url,已经存储在数组 urls 中(即urls = [‘http://example.com/1.jpg’, …., ‘http://example.com/8.jpg’]),而且已经有一个函数 function loadImg,输入一个 url 链接,返回一个 Promise,该 Promise 在图片下载完成的时候 resolve,下载失败则 reject。但是我们要求,任意时刻,同时下载的链接数量不可以超过 3 个。请写一段代码实现这个需求,要求尽可能快速地将所有图片下载完成。
  • 已有代码如下:
      var urls = [
      'https://www.kkkk1000.com/images/getImgData/getImgDatadata.jpg', 
      'https://www.kkkk1000.com/images/getImgData/gray.gif', 
      'https://www.kkkk1000.com/images/getImgData/Particle.gif', 
      'https://www.kkkk1000.com/images/getImgData/arithmetic.png', 
      'https://www.kkkk1000.com/images/getImgData/arithmetic2.gif', 
      'https://www.kkkk1000.com/images/getImgData/getImgDataError.jpg', 
      'https://www.kkkk1000.com/images/getImgData/arithmetic.gif', 
      'https://www.kkkk1000.com/images/wxQrCode2.png'
      ];
      function loadImg(url) {
          return new Promise((resolve, reject) => {
              const img = new Image()
              img.onload = function () {
                  console.log('一张图片加载完成');
                  resolve();
              }
              img.onerror = reject
              img.src = url
          })
      };

思路:

  • 用 Promise.race来实现,先并发请求3个图片资源,这样可以得到 3 个 Promise实例,组成一个数组promises ,然后不断的调用 Promise.race 来返回最快改变状态的 Promise,然后从数组(promises )中删掉这个 Promise 对象实例,再加入一个新的 Promise实例,直到全部的 url 被取完。

      //省略代码
      function limitLoad(urls, handler, limit) {
          // 对数组做一个拷贝
          const sequence = [].concat(urls)
          let promises = [];
    
          //并发请求到最大数
          promises = sequence.splice(0, limit).map((url, index) => {
              // 这里返回的 index 是任务在 promises 的脚标,
              //用于在 Promise.race 之后找到完成的任务脚标
              return handler(url).then(() => {
                  return index
              });
          });
    
          (async function loop() {
              let p = Promise.race(promises);
              for (let i = 0; i < sequence.length; i++) {
                  p = p.then((res) => {
                      promises[res] = handler(sequence[i]).then(() => {
                          return res
                      });
                      return Promise.race(promises)
                  })
              }
          })()
      }
      limitLoad(urls, loadImg, 3)
  • 参考


   转载规则


《promise.all并发限制》 朝飞 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
理解和使用Promise.all和Promise.race 理解和使用Promise.all和Promise.race
一、Pomise.all的使用 Promise.all可以将多个Promise实例包装成一个新的Promise实例。同时,成功和失败的返回值是不同的,成功的时候返回的是一个结果数组,而失败的时候则返回最先被reject失败状态的值。 具体
2020-02-19
本篇 
promise.all并发限制 promise.all并发限制
下面通过3篇讲解promise.all并发限制Promise.all并发限制背景 通常,我们在需要保证代码在多个异步处理之后执行,会用到: Promise.all(promises: []).then(fun: function);
2020-02-19
  目录