Skip to content

在 JavaScript 中实现 并行限制的 Promise(即控制同时运行的异步任务数量)主要有以下几种方式:

<!-- more -->


一、基于调度器类(Scheduler)的队列管理

核心思想:通过维护任务队列和当前运行任务计数器,动态调度新任务。
实现步骤

  1. 定义 Scheduler 类,包含任务队列、最大并发数、当前运行任务计数器。
  2. 通过 add 方法添加任务,触发任务调度。
  3. 使用递归或循环,在任务完成时从队列中取出下一个任务执行。
javascript
class Scheduler {
  constructor(max) {
    this.max = max;
    this.queue = [];
    this.running = 0;
  }
  add(promiseCreator) {
    this.queue.push(promiseCreator);
    this.run();
  }
  run() {
    while (this.running < this.max && this.queue.length) {
      const task = this.queue.shift();
      this.running++;
      task().finally(() => {
        this.running--;
        this.run();
      });
    }
  }
}

特点

  • 灵活控制队列,支持动态添加任务。
  • 每个任务完成后自动触发下一个任务(类似“接力”机制)。

函数实现

js
function createScheduler(max) {
  let queue = [];
  let running = 0;

  function run() {
    while (running < max && queue.length) {
      const task = queue.shift();
      running++;
      task().finally(() => {
        running--;
        run();
      });
    }
  }

  return function (promiseCreator) {
    queue.push(promiseCreator);
    run();
  };
}

改成两个参数的版本

js
// 版本1:基础执行版(不收集结果)
function concurrentRun(limit = 2, promises) {
  let queue = [...promises];
  let running = 0;

  function runner() {
    while (running < limit && queue.length) {
      const task = queue.shift();
      running++;
      task().finally(() => {
        running--;
        runner();
      });
    }
  }

  runner();
}

// 版本2:Promise 结果收集版
function concurrentRunWithResults(limit, promises) {
  return new Promise((resolve) => {
    const results = new Array(promises.length);
    let queue = promises.map((fn, i) => ({ fn, index: i }));
    let running = 0;
    let completed = 0;

    async function runner() {
      while (running < limit && queue.length) {
        const { fn, index } = queue.shift();
        running++;
        
        try {
          results[index] = await fn();
        } catch (e) {
          results[index] = e;
        } finally {
          running--;
          completed++;
          
          if (completed === promises.length) {
            resolve(results);
          } else {
            runner();
          }
        }
      }
    }

    runner();
  });
}

20250311171447


二、递归 + 分批并发(Promise.all 分片)

核心思想:将任务数组按批次切割,每批用 Promise.all 执行,递归处理剩余批次。
实现步骤

  1. 将任务数组按 max 值分片。
  2. 对每片任务执行 Promise.all,递归处理下一片。

代码示例(参考 [1] [5]):

javascript
async function batchRun(tasks, max) {
  let results = [];
  for (let i = 0; i < tasks.length; i += max) {
    const batch = tasks.slice(i, i + max);
    results = results.concat(await Promise.all(batch.map(fn => fn())));
  }
  return results;
}

特点

  • 实现简单,但无法动态调整并发数。
  • 任务必须预先全部加载,无法中途添加。

三、Promise.race 动态控制并发池

核心思想:利用 Promise.race 监听并发池中的任务完成事件,动态替换新任务。
实现步骤

  1. 初始化并发池(数组),填充初始任务。
  2. 使用 Promise.race 监听池内任务,任一完成则替换新任务。

代码示例(参考 [5] [8]):

javascript
async function parallelLimit(tasks, max) {
  const pool = new Set();
  for (const task of tasks) {
    const promise = task().then(() => pool.delete(promise));
    pool.add(promise);
    if (pool.size >= max) await Promise.race(pool);
  }
  await Promise.all(pool);
}

特点

  • 动态维持最大并发数,无需预分片。
  • 内存占用较高(需维护池对象)。

四、生成器函数 + 异步迭代

核心思想:利用生成器函数逐个产出任务,通过异步迭代控制执行节奏。
代码示例(参考 [7]):

javascript
async function* taskGenerator(tasks) {
  for (const task of tasks) yield task();
}
async function runWithLimit(tasks, max) {
  const iterator = taskGenerator(tasks);
  const workers = Array(max).fill(iterator).map(async (it) => {
    for await (const result of it) console.log(result);
  });
  await Promise.all(workers);
}

特点

  • 代码可读性强,适合复杂任务流。
  • 需要 ES2018 的 for await...of 支持。

五、第三方库(推荐)

常用库

  1. p-limit:轻量级库,支持链式调用([5])。
  2. tiny-async-pool:极简实现,核心代码仅 10 行([5])。

使用示例

javascript
import pLimit from 'p-limit';
const limit = pLimit(2); // 最大并发数 2
const tasks = [/* 异步函数数组 */];
const results = await Promise.all(tasks.map(task => limit(task)));

关键对比

方法动态添加任务预加载任务实现复杂度适用场景
调度器类✔️中等需要灵活控制的长期任务
递归分片✔️简单固定任务数组
Promise.race 动态池✔️✔️中等高实时性任务
生成器迭代✔️✔️较高复杂任务流
第三方库✔️✔️生产环境

选择建议

  • 简单场景:使用 递归分片第三方库(如 p-limit)。
  • 动态任务:选择 调度器类Promise.race 动态池
  • 错误处理:注意在任务失败时是否需要终止整个流程(Promise.all 会整体失败,Promise.allSettled 更灵活,参考 [1])。

以上方案均可根据实际需求调整错误处理、任务优先级等逻辑。