发布于 2026-01-06 1 阅读
0

JavaScript 中的异步生成器和管道

JavaScript 中的异步生成器和管道

异步生成器简介

本文和上一篇文章《迭代器来了》(讨论异步迭代器)的写作动机都源于我在编写一些async函数时想到的一个问题:是否可以yieldasync函数中实现?换句话说,我们能否将async函数与生成器函数结合起来?

为了探讨这个问题,我们先从一个普通的同步生成器函数开始numberGenerator

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const getValue = () => {
    return random(1,10)
}

const numberGenerator = function* () {
    for (let i=0; i<5; i++) {
        const value = getValue() 
        yield value**2
    }
}

const main = () => {
    const numbers = numberGenerator()
    for (const v of numbers) {
        console.log('number = ' + v)
    }
}

main()
Enter fullscreen mode Exit fullscreen mode

这段代码生成 5 个随机数的期望平方:

C:\dev>node gen.js
number = 1
number = 64
number = 36
number = 25
number = 49
Enter fullscreen mode Exit fullscreen mode

我的想法是修改代码getValue,使其返回一个 Promise 对象,然后修改该 Promise 对象,再传入numberGenerator一个值。我尝试了类似下面的方法:awaityield

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const getValue = () => {
    //return promise instead of value
    return new Promise(resolve=>{
        setTimeout(()=>resolve(random(1,10)), 1000)
    })
}

const numberGenerator = function* () {
    for (let i=0; i<5; i++) {
        const value = await getValue() //await promise
        yield value**2
    }
}

const main = () => {
    const numbers = numberGenerator()
    for (const v of numbers) {
        console.log('number = ' + v)
    }
}

main()
Enter fullscreen mode Exit fullscreen mode

让我们看看会发生什么:

C:\dev\gen.js:12
                const value = await getValue() //await promise
                              ^^^^^

SyntaxError: await is only valid in async function
    at new Script (vm.js:51:7)
Enter fullscreen mode Exit fullscreen mode

好的,明白了:我们需要实现这个numberGenerator函数async。我们来试试!

const numberGenerator = async function* () { //added async
Enter fullscreen mode Exit fullscreen mode

它有效吗?

C:\dev\gen.js:10
const numberGenerator = async function* () { //added async
                                      ^

SyntaxError: Unexpected token *
    at new Script (vm.js:51:7)
Enter fullscreen mode Exit fullscreen mode

哎呀,失败了。这促使我上网搜索相关信息。原来这种功能将在 ES2018 中发布,而且我们已经可以在最新版本的 Node.js 中使用相应的标志来使用它了--harmony-async-iteration

让我们看看实际效果:

const timer = () => setInterval(()=>console.log('tick'), 1000)

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const getValue = () => {
    //return promise instead of value
    return new Promise(resolve=>{
        setTimeout(()=>resolve(random(1,10)), 1000)
    })
}

const numberGenerator = async function* () { //added async
    for (let i=0; i<5; i++) {
        const value = await getValue() //await promise
        yield value**2
    }
}

//main is 'async'
const main = async () => {
    const t = timer()
    const numbers = numberGenerator()

    //use 'for await...of' instead of 'for...of'
    for await (const v of numbers) {
        console.log('number = ' + v)
    }

    clearInterval(t)
}

main()
Enter fullscreen mode Exit fullscreen mode

与之前的代码版本相比,有一些细微的改动:

  • 函数mainfor...of循环变成了一个for await...of循环。
  • 由于我们正在使用awaitmain因此必须标记为async

我们还添加了一个定时器,以便确认生成器确实是异步的。

让我们来看看结果:

C:\dev>node --harmony-async-iteration gen.js
tick
number = 16
tick
number = 1
tick
number = 100
tick
number = 100
tick
number = 49
Enter fullscreen mode Exit fullscreen mode

成功了!

yield生成器函数中的`return`与普通(同步)生成器函数中的 ` asyncreturn` 类似yield。区别在于,普通版本yield会生成一个{value, done}元组,而异步版本会生成一个解析元组的 Promise {value, done}

如果你使用yieldPromise,JavaScript 运行时会做一些有点狡猾的事情:它仍然会生成自己的 Promise,该 Promise 会解析为一个{value, done}元组,但value该元组中的属性将是你的 Promise 解析为的任何内容。

将异步生成器流水线化

让我们来看一个这项技术的巧妙应用:我们将创建一个异步生成器函数,该函数驱动另一个函数来生成异步数字流的统计信息。

这种类型的管道可用于对异步数据流执行任意转换。

首先,我们将编写一个异步生成器,用于生成无限循环的值流。它每秒生成一个介于 0 到 100 之间的随机值:

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const asyncNumberGenerator = async function* () {
    while (true) {
        const randomValue = random(0,100)

        const p = new Promise(resolve=>{
            setTimeout(()=>resolve(randomValue), 1000)
        })      

        yield p
    }
}
Enter fullscreen mode Exit fullscreen mode

现在我们将编写一个函数createStatsReducer。该函数返回一个回调函数,exponentialStatsReducer该回调函数将用于迭代计算此数据流的指数移动平均值:

const createStatsReducer = alpha => { 
    const beta = 1 - alpha

    const exponentialStatsReducer = (newValue, accumulator) => {
        const redistributedMean = beta * accumulator.mean

        const meanIncrement = alpha * newValue

        const newMean = redistributedMean + meanIncrement

        const varianceIncrement = alpha * (newValue - accumulator.mean)**2

        const newVariance = beta * (accumulator.variance + varianceIncrement)

        return {
            lastValue: newValue,
            mean: newMean,
            variance: newVariance
        }
    }

    return exponentialStatsReducer
}
Enter fullscreen mode Exit fullscreen mode

接下来是第二个异步生成器函数asyncReduce。这个函数会将 reducer 应用于异步可迭代对象。它的工作方式类似于 JavaScript 内置的 reducer Array.prototype.reduce。但是,标准版本会遍历整个数组来生成最终值,而我们的版本则会延迟应用 reducer。这使得我们可以使用无限序列的值(即上面的异步数字生成器)作为数据源:

const asyncReduce = async function* (iterable, reducer, accumulator) {
    for await (const item of iterable) {
        const reductionResult = reducer(item, accumulator)

        accumulator = reductionResult

        yield reductionResult
    }
}
Enter fullscreen mode Exit fullscreen mode

让我们把所有这些联系起来。下面的代码会将一系列异步生成的数字传递给我们的异步 reduce 函数。我们将循环遍历这些结果值(无限循环),并在新值到达时获取更新后的均值、方差和标准差:

const timer = () => setInterval(()=>console.log('tick'), 1000)

const main = async () => {
    const t = timer()

    const numbers = asyncNumberGenerator()

    const firstValue = await numbers.next()

    //initialize the mean to the first value
    const initialValue = { mean: firstValue.value, variance: 0 }

    console.log('first value = ' + firstValue.value)

    const statsReducer = createStatsReducer(0.1)

    const reducedValues = asyncReduce(numbers, statsReducer, initialValue)

    for await (const v of reducedValues) {
        const lastValue = v.lastValue
        const mean = v.mean.toFixed(2)
        const variance = v.variance.toFixed(2)
        const stdev = Math.sqrt(v.variance).toFixed(2)

        console.log(`last value = ${lastValue}, stats = { mean: ${mean}`
            + `, variance: ${variance}, stdev: ${stdev} }`)
    }

    clearInterval(t)
}

main()
Enter fullscreen mode Exit fullscreen mode

让我们来看一些示例输出:

C:\dev>node --harmony-async-iteration async_stats.js
tick
first value = 51
tick
last value = 97, stats = { mean: 55.60, variance: 190.44, stdev: 13.80 }
tick
last value = 73, stats = { mean: 57.34, variance: 198.64, stdev: 14.09 }
tick
last value = 11, stats = { mean: 52.71, variance: 372.05, stdev: 19.29 }
tick
last value = 42, stats = { mean: 51.64, variance: 345.16, stdev: 18.58 }
tick
last value = 42, stats = { mean: 50.67, variance: 319.00, stdev: 17.86 }
tick
last value = 60, stats = { mean: 51.60, variance: 294.93, stdev: 17.17 }
^C
Enter fullscreen mode Exit fullscreen mode

现在我们可以持续更新异步值流的统计数据了。真棒!

我认为异步生成器函数对于处理这类异步数据源尤其有用。

请告诉我你的想法,或者如果你有关于异步生成器和迭代器的其他使用方法的想法!

参考:

有关的:

文章来源:https://dev.to/nestedsoftware/asynchronous-generators-and-pipelines-in-javascript--1h62