JavaScript 中的异步生成器和管道
异步生成器简介
本文和上一篇文章《迭代器来了》(讨论异步迭代器)的写作动机都源于我在编写一些async函数时想到的一个问题:是否可以yield在async函数中实现?换句话说,我们能否将async函数与生成器函数结合起来?
为了探讨这个问题,我们先从一个普通的同步生成器函数开始numberGenerator:
const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min
const getValue = () => {
return random(1,10)
}
const numberGenerator = function* () {
for (let i=0; i<5; i++) {
const value = getValue()
yield value**2
}
}
const main = () => {
const numbers = numberGenerator()
for (const v of numbers) {
console.log('number = ' + v)
}
}
main()
这段代码生成 5 个随机数的期望平方:
C:\dev>node gen.js
number = 1
number = 64
number = 36
number = 25
number = 49
我的想法是修改代码getValue,使其返回一个 Promise 对象,然后修改该 Promise 对象,再传入numberGenerator一个值。我尝试了类似下面的方法:awaityield
const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min
const getValue = () => {
//return promise instead of value
return new Promise(resolve=>{
setTimeout(()=>resolve(random(1,10)), 1000)
})
}
const numberGenerator = function* () {
for (let i=0; i<5; i++) {
const value = await getValue() //await promise
yield value**2
}
}
const main = () => {
const numbers = numberGenerator()
for (const v of numbers) {
console.log('number = ' + v)
}
}
main()
让我们看看会发生什么:
C:\dev\gen.js:12
const value = await getValue() //await promise
^^^^^
SyntaxError: await is only valid in async function
at new Script (vm.js:51:7)
好的,明白了:我们需要实现这个numberGenerator函数async。我们来试试!
const numberGenerator = async function* () { //added async
它有效吗?
C:\dev\gen.js:10
const numberGenerator = async function* () { //added async
^
SyntaxError: Unexpected token *
at new Script (vm.js:51:7)
哎呀,失败了。这促使我上网搜索相关信息。原来这种功能将在 ES2018 中发布,而且我们已经可以在最新版本的 Node.js 中使用相应的标志来使用它了--harmony-async-iteration。
让我们看看实际效果:
const timer = () => setInterval(()=>console.log('tick'), 1000)
const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min
const getValue = () => {
//return promise instead of value
return new Promise(resolve=>{
setTimeout(()=>resolve(random(1,10)), 1000)
})
}
const numberGenerator = async function* () { //added async
for (let i=0; i<5; i++) {
const value = await getValue() //await promise
yield value**2
}
}
//main is 'async'
const main = async () => {
const t = timer()
const numbers = numberGenerator()
//use 'for await...of' instead of 'for...of'
for await (const v of numbers) {
console.log('number = ' + v)
}
clearInterval(t)
}
main()
与之前的代码版本相比,有一些细微的改动:
- 函数
main的for...of循环变成了一个for await...of循环。 - 由于我们正在使用
await,main因此必须标记为async
我们还添加了一个定时器,以便确认生成器确实是异步的。
让我们来看看结果:
C:\dev>node --harmony-async-iteration gen.js
tick
number = 16
tick
number = 1
tick
number = 100
tick
number = 100
tick
number = 49
成功了!
yield生成器函数中的`return`与普通(同步)生成器函数中的 `asyncreturn` 类似yield。区别在于,普通版本yield会生成一个{value, done}元组,而异步版本会生成一个解析为元组的 Promise{value, done}。如果你使用
yieldPromise,JavaScript 运行时会做一些有点狡猾的事情:它仍然会生成自己的 Promise,该 Promise 会解析为一个{value, done}元组,但value该元组中的属性将是你的 Promise 解析为的任何内容。
将异步生成器流水线化
让我们来看一个这项技术的巧妙应用:我们将创建一个异步生成器函数,该函数驱动另一个函数来生成异步数字流的统计信息。
这种类型的管道可用于对异步数据流执行任意转换。
首先,我们将编写一个异步生成器,用于生成无限循环的值流。它每秒生成一个介于 0 到 100 之间的随机值:
const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min
const asyncNumberGenerator = async function* () {
while (true) {
const randomValue = random(0,100)
const p = new Promise(resolve=>{
setTimeout(()=>resolve(randomValue), 1000)
})
yield p
}
}
现在我们将编写一个函数createStatsReducer。该函数返回一个回调函数,exponentialStatsReducer该回调函数将用于迭代计算此数据流的指数移动平均值:
const createStatsReducer = alpha => {
const beta = 1 - alpha
const exponentialStatsReducer = (newValue, accumulator) => {
const redistributedMean = beta * accumulator.mean
const meanIncrement = alpha * newValue
const newMean = redistributedMean + meanIncrement
const varianceIncrement = alpha * (newValue - accumulator.mean)**2
const newVariance = beta * (accumulator.variance + varianceIncrement)
return {
lastValue: newValue,
mean: newMean,
variance: newVariance
}
}
return exponentialStatsReducer
}
接下来是第二个异步生成器函数asyncReduce。这个函数会将 reducer 应用于异步可迭代对象。它的工作方式类似于 JavaScript 内置的 reducer Array.prototype.reduce。但是,标准版本会遍历整个数组来生成最终值,而我们的版本则会延迟应用 reducer。这使得我们可以使用无限序列的值(即上面的异步数字生成器)作为数据源:
const asyncReduce = async function* (iterable, reducer, accumulator) {
for await (const item of iterable) {
const reductionResult = reducer(item, accumulator)
accumulator = reductionResult
yield reductionResult
}
}
让我们把所有这些联系起来。下面的代码会将一系列异步生成的数字传递给我们的异步 reduce 函数。我们将循环遍历这些结果值(无限循环),并在新值到达时获取更新后的均值、方差和标准差:
const timer = () => setInterval(()=>console.log('tick'), 1000)
const main = async () => {
const t = timer()
const numbers = asyncNumberGenerator()
const firstValue = await numbers.next()
//initialize the mean to the first value
const initialValue = { mean: firstValue.value, variance: 0 }
console.log('first value = ' + firstValue.value)
const statsReducer = createStatsReducer(0.1)
const reducedValues = asyncReduce(numbers, statsReducer, initialValue)
for await (const v of reducedValues) {
const lastValue = v.lastValue
const mean = v.mean.toFixed(2)
const variance = v.variance.toFixed(2)
const stdev = Math.sqrt(v.variance).toFixed(2)
console.log(`last value = ${lastValue}, stats = { mean: ${mean}`
+ `, variance: ${variance}, stdev: ${stdev} }`)
}
clearInterval(t)
}
main()
让我们来看一些示例输出:
C:\dev>node --harmony-async-iteration async_stats.js
tick
first value = 51
tick
last value = 97, stats = { mean: 55.60, variance: 190.44, stdev: 13.80 }
tick
last value = 73, stats = { mean: 57.34, variance: 198.64, stdev: 14.09 }
tick
last value = 11, stats = { mean: 52.71, variance: 372.05, stdev: 19.29 }
tick
last value = 42, stats = { mean: 51.64, variance: 345.16, stdev: 18.58 }
tick
last value = 42, stats = { mean: 50.67, variance: 319.00, stdev: 17.86 }
tick
last value = 60, stats = { mean: 51.60, variance: 294.93, stdev: 17.17 }
^C
现在我们可以持续更新异步值流的统计数据了。真棒!
我认为异步生成器函数对于处理这类异步数据源尤其有用。
请告诉我你的想法,或者如果你有关于异步生成器和迭代器的其他使用方法的想法!
参考:
有关的:
- 迭代器即将到来
- 仔细检查 JavaScript 等待
- 流数据的指数移动平均
- 如何在 JavaScript 中序列化并发操作:回调、Promise 和 Async/Await
- 使用生成器、Map、Filter 和 Reduce 在 JavaScript 中实现惰性求值