js异步并发控制

    科技2024-11-30  7

    原地址

    一个极端业务场景引发的思考

    在业务开发过程中,我们经常会遇到多个异步任务并发执行的情况,待所有异步任务结束之后再执行我们的业务逻辑。 通常情况下,我们会采用 ES6 标准下的Promise.all([promise1, promise2, promise3,…]).then( )方法来应对这样的场景需求, Promise.all 可以保证,promises 数组中所有 promise 对象都达到 resolve 状态,才执行 then 回调。 这样的 promise 对象可能是你发出的 http 请求,亦或是普通的库表查询操作,看起来平平常常没什么了不起,但是如果这样的 promise 不是一次来个三五个,而是成百上千个地同时出现,不好意思,小船说翻就翻。为什么?因为你在瞬间发出了大量的 http 请求(tcp 连接数不足可能造成等待),或者堆积了无数调用栈导致内存溢出。 这个时候,我们就需要对 Promise.all做出限制了,我们要限制单次并发量,但最终的执行结果还是和常规的 Promise.all保持一致。

    并发控制的实现思路

    从工具开发者的角度来思考,我们不能限制用户想要执行的异步任务数量,但是我们可以规定单次并发执行的 promise 数量,更进一步讲就是控制 promise 的实例化数量,以规避高并发带来的种种问题。当本次 promise 全部 resolve 或者有单个 promise 最先达到 resolve 状态,再将余下的 promise 依次放入队列。 GitHub 上有不少针对该功能实现的开源项目,并已经发布到 npm 上,比如,async-pool、p-limit 以及功能比较丰富的 es6-promise-pool,在这里我们选取前 2 个项目来进行简要分析。

    实现与分析

    1、async-pool

    asyncPool 提供了两种实现,一种是基于 ES6 标准的 Promise,另外一种是利用 ES7 的 async 函数来实现。

    (1)asyncPool-es6.js

    /** * * @param { 并发限制 } poolLimit * @param { promise 数组 } array * @param { callback } iteratorFn */ function asyncPool(poolLimit, array, iteratorFn) { let i = 0 const ret = [] const executing = [] const enqueue = function () { // ① 边界条件,array 为空或者 promise 都已达到 resolve 状态 if (i === array.length) { return Promise.resolve() } const item = array[i++] // ② 生成一个 promise 实例,并在 then 方法中的 onFullfilled 函数里返回实际要执行的 promise, const p = Promise.resolve().then(() => iteratorFn(item, array)) ret.push(p) // ④ 将执行完毕的 promise 移除 const e = p.then(() => executing.splice(executing.indexOf(e), 1)) // ③ 将正在执行的 promise 插入 executing 数组 executing.push(e) let r = Promise.resolve() // ⑥ 如果正在执行的 promise 数量达到了并发限制,则通过 Promise.race 触发新的 promise 执行 if (executing.length >= poolLimit) { r = Promise.race(executing) } // ⑤ 递归执行 enqueue,直到满足 ① return r.then(() => enqueue()) } return enqueue().then(() => Promise.all(ret)) }

    以上代码大致按执行顺序做了注释,总结起来有以下 4 点:

    从 array 第 1 个元素开始,初始化 promise 对象,同时用一个 executing 数组保存正在执行的 promise 不断初始化 promise,直到达到 poolLimt 使用 Promise.race,获得 executing 中 promise 的执行情况,当有一个 promise 执行完毕,继续初始化 promise 并放入 executing 中 所有 promise 都执行完了,调用 Promise.all 返回 demo

    import asyncPool from "tiny-async-pool"; const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => { ... });

    (2)asyncPool-es7.js

    async function asyncPool(poolLimit, array, iteratorFn) { const ret = [] const executing = [] for (const item of array) { const p = Promise.resolve().then(() => iteratorFn(item, array)) ret.push(p) const e = p.then(() => executing.splice(executing.indexOf(e), 1)) executing.push(e) if (executing.length >= poolLimit) { await Promise.race(executing) } } return Promise.all(ret) }

    实现方案和 es6 没有任何区别,但是得益于 async 函数简洁的语法,代码行数和逻辑清晰度上了不止一个台阶。

    import asyncPool from 'tiny-async-pool' const timeout = (i) => new Promise((resolve) => setTimeout(() => resolve(i), i)) const results = await asyncPool(2, [1000, 5000, 3000, 2000], timeout)
    Processed: 0.054, SQL: 8