使用Node.js Streams将100万行SQL语句处理成CSV文件
结论
在本练习中,我们将探索 Node.js 的核心概念,重点关注流的强大功能。挑战在于读取一百万行 SQL 数据并将其导出到 CSV 文件。那么,首先,什么是流?
根据Node.js文档,我们可以发现:
Stream 是 Node.js 中用于处理流式数据的抽象接口。
但这到底是什么意思呢?你可以把流(Stream)理解为在 Node.js 中处理连续数据流的一种方式,就像一条数据管道。但我们为什么需要它呢?因为有时我们的计算机无法一次性处理所有数据,数据量实在太大了。流的作用就在于允许我们一次处理一小块数据,就像你在 Netflix 上看剧一样——剧集是分段播放的,而不是一次性全部播放。
在接下来的示例中,我们将使用三种特定类型的流:可读流、转换流和可写流。我们将基于 Postgres 查询,处理结果并将其保存到 CSV 文件中。
您可以在这里找到示例源代码:
Github 仓库
综合起来
让我们先从打好基础开始:
1. 添加 PostgreSQL 依赖项
yarn add pg pg-query-stream
2. 创建数据库池
我们使用该软件包创建了一个数据库连接池函数pg。该连接池将使我们能够管理和处理数据库连接。
import pg from 'pg';
export function createDatabasePool() {
try {
const connectionString = `postgres://${USER}:${PASSWORD}@localhost:5432/postgres`;
const pool = new pg.Pool({ connectionString });
return pool;
} catch (error) {
console.error('Error creating database pool:', error);
throw error;
}
}
3. 配置流
我们创建了三种类型的流来完成我们的任务:可读流用于从数据库中获取数据,转换流用于处理和格式化数据,以及可写流用于将处理后的数据保存到 CSV 文件。
要创建可读流,您需要该包pg-query-stream,它将从 pg 接收结果行作为可读(对象)流。
可读流
该流在服务器上使用游标,因此内存中仅保留少量行,游标大小由变量定义。batchSize
const queryStream = new QueryStream(
"SELECT * FROM generate_series(0, $1) num",
[1000000],
{ batchSize: 1000 }
);
转换流
因为我们接收到的是一个对象,所以需要先转换数据才能将其添加到文件中。我还将新数据添加到数据块中,如下所示。
const transformStream = new Transform({
objectMode: true,
transform(row, encoding, callback) {
row.description = `Row ${row.num}`;
row.date = new Date().toString();
callback(null, `${row.num}, ${row.description}, ${row.date}` + "\n");
},
});
可写流
在这种情况下,我们将数据写入文件,以便可以使用 Node.js 文件流。
const fileWriteStream = fileStream.createWriteStream("output.csv");
启动数据流
配置好数据流后,我们定义一个名为 startStream 的函数来启动数据流过程。在这个函数内部,我们使用连接池建立与数据库的连接,并根据提供的 SQL 查询创建查询流。
const startStream = (transformStream, writeStream) => {
console.log("STARTED ", new Date());
pool.connect((err, client, done) => {
if (err) console.error(err);
const stream = client.query(queryStream);
stream
.pipe(transformStream)
.pipe(writeStream)
.on("error", console.error)
.on("finish", () => {
console.log("FINISHED: ", new Date());
done();
});
});
};
startStream(transformStream, fileWriteStream);
说明:stream.pipe(transformStream)将查询流连接到转换流。这意味着从数据库检索到的数据将通过转换流进行处理。
transformStream.pipe(writeStream):将转换流连接到写入流。然后使用 writeStream 将来自转换流的处理后数据写入指定的文件。
.on("error", console.error):为管道附加一个错误事件监听器。如果在任何阶段发生错误,错误信息将被记录到控制台。
.on("finish", () => {...}):为管道附加一个完成事件监听器。当流式传输、转换和写入的整个过程完成后,将执行此函数。
在完成事件监听器中,使用时间戳记录console.log("FINISHED: ", new Date())数据处理完成的标志。
done()调用此函数将数据库客户端释放回池中,表明该客户端可供重用。
最后,调用 startStream 函数,并将 transformStream 和 fileWriteStream 作为参数,从而有效地启动整个数据处理和写入管道。
过程可视化
要查看该过程的直观表示,请查看终端:
$ node streams.js
STARTED 2023-08-10T05:33:06.521Z
FINISHED: 2023-08-10T05:33:24.567Z
Done in 28.70s.
output.csv同时,还会创建一个同名新文件,其中包含 100 万行转换后的数据!
结论
在本练习中,我们探索了 Node.js 流的强大功能及其高效处理海量数据的能力。我们学习了如何使用 Readable、Transform 和 Writable 流从 PostgreSQL 数据库读取数据、处理数据并将其保存为 CSV 文件。通过将数据处理分解成更小的块,我们可以节省内存并提高应用程序的整体性能。
欢迎您探索代码、尝试不同的设置,并将其应用到您自己的项目中。祝您编程愉快!