发布于 2026-01-06 0 阅读
0

使用 Node.js Streams 将 100 万行 SQL 数据处理为 CSV 文件 结论

使用Node.js Streams将100万行SQL语句处理成CSV文件

结论

在本练习中,我们将探索 Node.js 的核心概念,重点关注流的强大功能。挑战在于读取一百万行 SQL 数据并将其导出到 CSV 文件。那么,首先,什么是流?

根据Node.js文档,我们可以发现:

Stream 是 Node.js 中用于处理流式数据的抽象接口。

但这到底是什么意思呢?你可以把流(Stream)理解为在 Node.js 中处理连续数据流的一种方式,就像一条数据管道。但我们为什么需要它呢?因为有时我们的计算机无法一次性处理所有数据,数据量实在太大了。流的作用就在于允许我们一次处理一小块数据,就像你在 Netflix 上看剧一样——剧集是分段播放的,而不是一次性全部播放。

在接下来的示例中,我们将使用三种特定类型的流:可读流、转换流和可写流。我们将基于 Postgres 查询,处理结果并将其保存到 CSV 文件中。

您可以在这里找到示例源代码:
Github 仓库


综合起来

让我们先从打好基础开始:

1. 添加 PostgreSQL 依赖项

yarn add pg pg-query-stream
Enter fullscreen mode Exit fullscreen mode

2. 创建数据库池

我们使用该软件包创建了一个数据库连接池函数pg。该连接池将使我们能够管理和处理数据库连接。

import pg from 'pg';

export function createDatabasePool() {
  try {
    const connectionString = `postgres://${USER}:${PASSWORD}@localhost:5432/postgres`;
    const pool = new pg.Pool({ connectionString });
    return pool;
  } catch (error) {
    console.error('Error creating database pool:', error);
    throw error;
  }
}
Enter fullscreen mode Exit fullscreen mode

3. 配置流

我们创建了三种类型的流来完成我们的任务:可读流用于从数据库中获取数据,转换流用于处理和格式化数据,以及可写流用于将处理后的数据保存到 CSV 文件。

要创建可读流,您需要该包pg-query-stream,它将从 pg 接收结果行作为可读(对象)流。

可读流

该流在服务器上使用游标,因此内存中仅保留少量行,游标大小由变量定义。batchSize

const queryStream = new QueryStream(
  "SELECT * FROM generate_series(0, $1) num",
  [1000000],
  { batchSize: 1000 }
);
Enter fullscreen mode Exit fullscreen mode

转换流

因为我们接收到的是一个对象,所以需要先转换数据才能将其添加到文件中。我还将新数据添加到数据块中,如下所示。

const transformStream = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    row.description = `Row ${row.num}`;
    row.date = new Date().toString();
    callback(null, `${row.num}, ${row.description}, ${row.date}` + "\n");
  },
});
Enter fullscreen mode Exit fullscreen mode

可写流

在这种情况下,我们将数据写入文件,以便可以使用 Node.js 文件流。

const fileWriteStream = fileStream.createWriteStream("output.csv");
Enter fullscreen mode Exit fullscreen mode

启动数据流

配置好数据流后,我们定义一个名为 startStream 的函数来启动数据流过程。在这个函数内部,我们使用连接池建立与数据库的连接,并根据提供的 SQL 查询创建查询流。

const startStream = (transformStream, writeStream) => {
  console.log("STARTED ", new Date());
  pool.connect((err, client, done) => {
    if (err) console.error(err);

    const stream = client.query(queryStream);

    stream
      .pipe(transformStream)
      .pipe(writeStream)
      .on("error", console.error)
      .on("finish", () => {
        console.log("FINISHED: ", new Date());
        done();
      });
  });
};

startStream(transformStream, fileWriteStream);
Enter fullscreen mode Exit fullscreen mode

说明:
stream.pipe(transformStream)将查询流连接到转换流。这意味着从数据库检索到的数据将通过转换流进行处理。

transformStream.pipe(writeStream):将转换流连接到写入流。然后使用 writeStream 将来自转换流的处理后数据写入指定的文件。

.on("error", console.error):为管道附加一个错误事件监听器。如果在任何阶段发生错误,错误信息将被记录到控制台。

.on("finish", () => {...}):为管道附加一个完成事件监听器。当流式传输、转换和写入的整个过程完成后,将执行此函数。

在完成事件监听器中,使用时间戳记录console.log("FINISHED: ", new Date())数据处理完成的标志。

done()调用此函数将数据库客户端释放回池中,表明该客户端可供重用。

最后,调用 startStream 函数,并将 transformStream 和 fileWriteStream 作为参数,从而有效地启动整个数据处理和写入管道。

过程可视化

要查看该过程的直观表示,请查看终端:

$ node streams.js
STARTED  2023-08-10T05:33:06.521Z
FINISHED:  2023-08-10T05:33:24.567Z
Done in 28.70s.
Enter fullscreen mode Exit fullscreen mode

output.csv同时,还会创建一个同名新文件,其中包含 100 万行转换后的数据!

结论

在本练习中,我们探索了 Node.js 流的强大功能及其高效处理海量数据的能力。我们学习了如何使用 Readable、Transform 和 Writable 流从 PostgreSQL 数据库读取数据、处理数据并将其保存为 CSV 文件。通过将数据处理分解成更小的块,我们可以节省内存并提高应用程序的整体性能。
欢迎您探索代码、尝试不同的设置,并将其应用到您自己的项目中。祝您编程愉快!

文章来源:https://dev.to/danielevilela/processing-1-million-sql-rows-to-csv-using-nodejs-streams-3in2