Node.js 中的工作线程
介绍
Node.js 中的工作线程是一种将 CPU 密集型任务从 Node 提供的单线程进程中卸载出去的方法。
首先,我们需要了解为什么不能在 Node.js 实例的主进程中运行 CPU 密集型任务。这是因为 Node.js 是单线程的,默认情况下只能创建一个进程。进程是一个全局对象,它记录了当前正在执行的任务信息。
我只有一个主题要分享——Node.js
将 Node.js 做成单线程的决定源于不改变语言本身的设计。向 Javascript 添加多线程模块可能会改变语言本身的编写方式。
Node.js 只有一个事件循环,这使得 Node 具有异步特性,它将操作卸载到系统内核,并通过使用回调、Promise 和 async/await 来获取结果,因此我们不必担心并发问题。
当需要执行 CPU 密集型任务时,这可能会成为一个问题。例如,执行耗时较长的同步任务,或者包含复杂的数学计算的任务,这些计算可能会阻塞线程,导致其他任务必须等待。如果是 API 请求,则此时收到的任何其他 HTTP 请求都会被阻塞,从而导致最终用户长时间等待。解决此问题的方法是使用工作线程。
使用工作线程
我们将使用工作线程来计算斐波那契数列,并利用原子操作和共享缓冲区来帮助我们处理线程之间的竞争条件。
请点击这里查看这篇关于共享缓冲区和原子操作的精彩文章。
我们可以通过将其导入到我们的文件中来轻松使用工作线程模块。
const { Worker } = require('worker_threads');
主要流程
// main.js
const { Worker } = require("worker_threads");
const runFibonnaci = (nums) => {
// get the length of the array
let length = nums.length;
// int32 buffer of each element in the array
let size = Int32Array.BYTES_PER_ELEMENT * length;
// Create buffer for the size ofthe input array
let sharedBuffer = new SharedArrayBuffer(size);
let sharedArray = new Int32Array(sharedBuffer);
for(let i = 0; i < length; i++ ) {
// store each value into the shareArray
Atomics.store(sharedArray, i, nums[i]);
// Spin up a new worker thread
let worker = new Worker('./worker.js');
// Once calculation is done print out result
worker.once('message', (message) => {
console.log('Result received --- ', message);
})
// Send array data and index to worker thread.
worker.postMessage({data: sharedArray, index: i});
}
};
runFibonnaci([50, 20, 21, 24, 4 ]);
该rubFibonnaci函数接受一个要在工作线程中计算的数字数组,该sharedBuffer变量是使用SharedArrayBuffer类创建的size,该类由创建 sharedArrayBuffer 大小的变量定义。
// get the length of the array
let length = nums.length;
// int32 buffer of each element in the array
let size = Int32Array.BYTES_PER_ELEMENT * length;
// Create buffer for the size ofthe input array
let sharedBuffer = new SharedArrayBuffer(size);
let sharedArray = new Int32Array(sharedBuffer);
该sharedArray变量也是通过int32Array类创建的,该类用于创建一个 32 位有符号整数数组。我们使用原子操作来存储该变量sharedArray,以便每个工作线程都可以从单个内存实例访问该shareArray变量。原子操作仅适用于 SharedArrayBuffers 和 ArrayBuffers。
当内存共享时,多个线程可以读写内存中的相同数据。原子操作确保读写的是可预测的值,确保操作在下一个操作开始之前完成,并且这些操作不会被中断。根据MDN 文档
接下来,我们遍历传递nums给runFibonnaci函数的数组,然后使用静态函数存储每个值Atomic.store。
for(let i = 0; i < length; i++ ) {
// store each value into the shareArray
Atomics.store(sharedArray, i, nums[i]);
// Spin up a new worker thread
let worker = new Worker('./worker.js');
// Once calculation is done print out result
worker.once('message', (message) => {
console.log('Result received --- ', message);
})
// Send array data and index to worker thread.
worker.postMessage({data: sharedArray, index: i});
}
然后我们启动一个新的工作线程,并将参数sharedArray和值发送index到该工作线程。该worker.once('message')函数会在工作线程完成任务后被调用,并返回一个值,该值将在下面的工作线程文件中显示。
工人过程
// worker.js
const { Worker, isMainThread, parentPort } = require('worker_threads');
// Listen for message from main thread
parentPort.once('message', (event) => {
const sharedArray = event.data;
const index = event.index;
const arrValue = Atomics.load(sharedArray, index);
const fibonaciValue = calculateFibonacci(arrValue);
parentPort.postMessage(fibonaciValue);
});
const calculateFibonacci = (num) => {
var a = 1, b = 0, temp;
while (num >= 0){
temp = a;
a = a + b;
b = temp;
num--;
}
return b;
}
该parentPort.once函数在工作进程初始化并接收到数据后调用,它会加载索引sharedArray并将其存储在一个变量中。然后,它使用 Atomics.load 函数arrValue从索引中获取值,并通过调用函数计算该值的斐波那契数列,最后将计算结果返回给主进程以在控制台上打印。sharedArraycalculateFibonacci
您可以通过在控制台中运行此命令来运行代码node main.js
。
// console
Fibonacci received --- 20365011074
Fibonacci received --- 17711
Fibonacci received --- 75025
Fibonacci received --- 10946
Fibonacci received --- 5
结论
使用工作线程可以帮助你的 Node.js 应用程序在线程中执行 CPU 密集型任务。工作线程并不会神奇地让你的应用程序运行得更快,但它可以帮助解决某些特定指令集阻塞单个进程并导致其他任务失败的情况。
源代码可以在这里找到。
图片来自Unsplash用户K15 Photos
文章来源:https://dev.to/feezyhendrix/worker-threads-in-node-js-2ikh