并行限制的Promise
在 JavaScript 中实现 并行限制的 Promise(即控制同时运行的异步任务数量)主要有以下几种方式:
# 一、基于调度器类(Scheduler)的队列管理
核心思想:通过维护任务队列和当前运行任务计数器,动态调度新任务。
实现步骤:
- 定义
Scheduler
类,包含任务队列、最大并发数、当前运行任务计数器。 - 通过
add
方法添加任务,触发任务调度。 - 使用递归或循环,在任务完成时从队列中取出下一个任务执行。
class Scheduler {
constructor(max) {
this.max = max;
this.queue = [];
this.running = 0;
}
add(promiseCreator) {
this.queue.push(promiseCreator);
this.run();
}
run() {
while (this.running < this.max && this.queue.length) {
const task = this.queue.shift();
this.running++;
task().finally(() => {
this.running--;
this.run();
});
}
}
}
特点:
- 灵活控制队列,支持动态添加任务。
- 每个任务完成后自动触发下一个任务(类似“接力”机制)。
函数实现
function createScheduler(max) {
let queue = [];
let running = 0;
function run() {
while (running < max && queue.length) {
const task = queue.shift();
running++;
task().finally(() => {
running--;
run();
});
}
}
return function (promiseCreator) {
queue.push(promiseCreator);
run();
};
}
改成两个参数的版本
// 版本1:基础执行版(不收集结果)
function concurrentRun(limit = 2, promises) {
let queue = [...promises];
let running = 0;
function runner() {
while (running < limit && queue.length) {
const task = queue.shift();
running++;
task().finally(() => {
running--;
runner();
});
}
}
runner();
}
// 版本2:Promise 结果收集版
function concurrentRunWithResults(limit, promises) {
return new Promise((resolve) => {
const results = new Array(promises.length);
let queue = promises.map((fn, i) => ({ fn, index: i }));
let running = 0;
let completed = 0;
async function runner() {
while (running < limit && queue.length) {
const { fn, index } = queue.shift();
running++;
try {
results[index] = await fn();
} catch (e) {
results[index] = e;
} finally {
running--;
completed++;
if (completed === promises.length) {
resolve(results);
} else {
runner();
}
}
}
}
runner();
});
}
# 二、递归 + 分批并发(Promise.all 分片)
核心思想:将任务数组按批次切割,每批用 Promise.all
执行,递归处理剩余批次。
实现步骤:
- 将任务数组按
max
值分片。 - 对每片任务执行
Promise.all
,递归处理下一片。
代码示例(参考 [1] [5]):
async function batchRun(tasks, max) {
let results = [];
for (let i = 0; i < tasks.length; i += max) {
const batch = tasks.slice(i, i + max);
results = results.concat(await Promise.all(batch.map(fn => fn())));
}
return results;
}
特点:
- 实现简单,但无法动态调整并发数。
- 任务必须预先全部加载,无法中途添加。
# 三、Promise.race 动态控制并发池
核心思想:利用 Promise.race
监听并发池中的任务完成事件,动态替换新任务。
实现步骤:
- 初始化并发池(数组),填充初始任务。
- 使用
Promise.race
监听池内任务,任一完成则替换新任务。
代码示例(参考 [5] [8]):
async function parallelLimit(tasks, max) {
const pool = new Set();
for (const task of tasks) {
const promise = task().then(() => pool.delete(promise));
pool.add(promise);
if (pool.size >= max) await Promise.race(pool);
}
await Promise.all(pool);
}
特点:
- 动态维持最大并发数,无需预分片。
- 内存占用较高(需维护池对象)。
# 四、生成器函数 + 异步迭代
核心思想:利用生成器函数逐个产出任务,通过异步迭代控制执行节奏。
代码示例(参考 [7]):
async function* taskGenerator(tasks) {
for (const task of tasks) yield task();
}
async function runWithLimit(tasks, max) {
const iterator = taskGenerator(tasks);
const workers = Array(max).fill(iterator).map(async (it) => {
for await (const result of it) console.log(result);
});
await Promise.all(workers);
}
特点:
- 代码可读性强,适合复杂任务流。
- 需要 ES2018 的
for await...of
支持。
# 五、第三方库(推荐)
常用库:
p-limit
:轻量级库,支持链式调用([5])。tiny-async-pool
:极简实现,核心代码仅 10 行([5])。
使用示例:
import pLimit from 'p-limit';
const limit = pLimit(2); // 最大并发数 2
const tasks = [/* 异步函数数组 */];
const results = await Promise.all(tasks.map(task => limit(task)));
# 关键对比
方法 | 动态添加任务 | 预加载任务 | 实现复杂度 | 适用场景 |
---|---|---|---|---|
调度器类 | ✔️ | ❌ | 中等 | 需要灵活控制的长期任务 |
递归分片 | ❌ | ✔️ | 简单 | 固定任务数组 |
Promise.race 动态池 | ✔️ | ✔️ | 中等 | 高实时性任务 |
生成器迭代 | ✔️ | ✔️ | 较高 | 复杂任务流 |
第三方库 | ✔️ | ✔️ | 低 | 生产环境 |
# 选择建议
- 简单场景:使用
递归分片
或第三方库
(如p-limit
)。 - 动态任务:选择
调度器类
或Promise.race 动态池
。 - 错误处理:注意在任务失败时是否需要终止整个流程(
Promise.all
会整体失败,Promise.allSettled
更灵活,参考 [1])。
以上方案均可根据实际需求调整错误处理、任务优先级等逻辑。
编辑 (opens new window)
上次更新: 2025/03/17, 12:21:00