发布于 2026-01-06 0 阅读
0

使用 Apache Kafka 构建实时数据流应用程序

使用 Apache Kafka 构建实时数据流应用程序

作者:Alexander Nnakwue ✏️

介绍

大多数大型科技公司通过各种方式从用户那里获取数据,而且大多数情况下,这些数据都是原始数据。如果能将数据转换成易于理解和使用的格式,就能更好地满足业务需求。真正的挑战在于如何处理这些数据,并在必要时进行转换或清洗,使其具有实际意义。

基本的数据流应用程序将数据从源存储桶移动到目标存储桶。更复杂的应用程序涉及流处理,它们可以动态执行一些特殊操作,例如改变输出数据的结构或添加新的属性或字段。

本文将介绍如何使用 Apache Kafka 构建一个最小化的实时数据流应用程序。文章还将探讨以下内容:

  • Kafka 和 ZooKeeper 作为我们的工具
  • 批量数据处理和存储
  • 在本地安装和运行 Kafka
  • 启动我们的应用程序
  • 安装依赖项
  • 创建 Kafka 主题
  • 针对创建的主题进行创作
  • 从某个主题中汲取灵感

根据其官网介绍,Kafka 是一个开源的、高度分布式的流式处理平台。它由 LinkedIn(现为 Apache 软件基金会的一部分)的工程师开发,以其可靠性、弹性和可扩展性而著称,支持流式事件/应用程序。它具有水平扩展性,默认具备容错能力,并提供高速处理能力。

Kafka 有多种用例,其中之一是构建数据管道或应用程序,以实时处理流式事件和/或批量数据。

我们将使用 Apache Kafka 来学习如何构建数据管道以传输批量数据。作为一个简单的演示,我们将模拟一个由数据源生成的大型 JSON 数据存储。

接下来,我们将编写一个生产者脚本,将来自某个源(例如 A 点)的 JSON 数据写入本地 Kafka 集群上的特定主题。最后,我们将编写一个消费者脚本,从指定的 Kafka 主题中消费已存储的数据。

注意:数据转换和/或增强主要是在数据从输入主题被另一个应用程序或输出主题使用时进行的。

这是数据工程中非常常见的场景,因为总是需要清理、转换、聚合甚至重新处理 Kafka 主题中通常原始且临时存储的数据,使其符合特定的标准或格式。

LogRocket 免费试用横幅

先决条件

要学习本教程,您需要:

  • 您的计算机上已安装最新版本的 Node.js 和 npm。
  • 您计算机上安装的最新 Java 版本 ( JVM )
  • Kafka 已安装在您的本地计算机上。在本教程中,我们将逐步介绍如何在本地计算机上安装 Kafka。
  • 对编写Node.js应用程序有基本的了解

不过,在继续之前,让我们回顾一下关于 Kafka 的一些基本概念和术语,以便我们能够轻松地跟随本教程进行学习。

动物园管理员

Kafka 高度依赖 ZooKeeper,后者是 Kafka 用于跟踪集群状态的服务。ZooKeeper 负责控制 Kafka broker 或服务器的同步和配置,包括选择合适的 Leader。有关 ZooKeeper 的更多详细信息,您可以查阅其详尽的文档

话题

Kafka 主题是跨多个 Kafka broker 的一组分区或数据组。为了更清晰地理解,主题充当集群中流式数据的间歇性存储机制。对于每个 Kafka 主题,我们可以设置复制因子和其他参数,例如分区数等。

生产者、消费者和产业集群

生产者是向 Kafka 代理或更准确地说是 Kafka 主题写入数据的客户端。消费者则读取数据,或者顾名思义,从 Kafka 主题或 Kafka 代理中消费数据。集群是指为当前 Kafka 实例提供支持的一组代理或服务器。

生产者、产业集群和消费者之间的关系
图 1:显示 Kafka 中生产者、集群和消费者之间的关系。

有关所有这些重要概念的更多详细信息,您可以查看Apache Kafka 文档的这一部分。

安装 Kafka

要安装 Kafka,我们只需从这里下载二进制文件并解压即可。我们可以通过在终端或命令提示符下运行以下命令来完成此操作:

cd <location-of-downloaded-kafka-binary>
tar -xzf <downloaded-kafka-binary>
cd <name-of_kafka-binary>
Enter fullscreen mode Exit fullscreen mode

tar命令会解压下载的 Kafka 二进制文件。之后,我们导航到 Kafka 的安装目录。我们将看到如下所示的所有文件:

已安装的 Kafka 文件夹结构及文件
图 2:已安装的 Kafka 文件夹结构及其文件的屏幕截图。

注: Kafka 二进制文件可以下载到我们机器上的任何路径。此外,截至撰写本文时,Kafka 的最新版本为 2.3.0。

此外,如果我们向上移动一级目录(cd ..),我们会config在下载的 Kafka 二进制文件目录下找到一个文件夹。在这里,我们可以配置 Kafka 服务器,并添加任何我们需要的更改或配置。现在,让我们开始吧:

cd ..
ls
cd config
ls
nano server.properties
Enter fullscreen mode Exit fullscreen mode

配置 Kafka 服务器
图 3:如何配置 Kafka 服务器。

现在我们已经知道了如何配置 Kafka 服务器,接下来将学习如何使用 Kafka。稍后,我们将学习可以重新配置或更新server.properties文件中的字段。

在本教程中,我们将使用适用于 Node.js 的kafka-node客户端库。请注意,Kafka也为其他编程语言提供了客户端,因此您可以随意选择任何其他语言来使用 Kafka。

卡夫卡来救场了

由于本练习中使用的是 Node.js,我们将首先创建一个结构最基本的应用程序。首先,我们将创建一个新目录来存放项目,并进入该目录,如下所示:

mkdir kafka-sample-app
cd kafka-sample-app
Enter fullscreen mode Exit fullscreen mode

package.json然后我们可以通过运行命令来创建一个文件npm init

现在我们可以按照说明像往常一样设置项目。package.json完成后,我们的文件应该如下所示:

{
  "name": "kafka-producer_consumer_tutorial",
  "version": "1.0.0",
  "description": "Building a real-time data streaming application pipeline with Apache Kafka",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node producer.js"
  },
  "author": "Alexander Nnakwue",
  "license": "MIT",
  "dependencies": {
    "dotenv": "^8.2.0",
    "kafka-node": "^4.1.3"
  }
}
Enter fullscreen mode Exit fullscreen mode

这里我们安装了两个稍后会用到的依赖项。要安装 kafka-node 客户端,我们npm install kafka-node在终端运行命令。kafka -node 的文档可以在 npm 上找到。该dotenv软件包用于为我们的应用程序设置环境变量。要安装该软件包,我们可以运行命令npm install dotenv

现在依赖项已经安装完毕,我们可以继续创建所有必要的文件,如下图所示:

文件层级结构
图 4:我们的文件层级结构。

上图显示了我们的应用程序所需的所有必要文件。让我们逐个查看文件,了解其功能。

首先,要从终端手动创建一个新主题,我们可以使用以下命令:

./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
Enter fullscreen mode Exit fullscreen mode

请注意,我们不应忘记将<ZOOKEEPER_URL:PORT><NO_OF_REPLICATIONS><NO_OF_PARTITIONS>和更新<TOPIC_NAME>为实际值。

不过,在本教程中,我们有一个脚本可以帮我们处理这个问题。创建新主题的代码可以在createTopic.js文件中找到。代码也显示在下方:

const kafka = require('kafka-node');
const config  = require('./config');

const client = new kafka.KafkaClient({kafkaHost: config.KafkaHost});



const topicToCreate = [{
  topic: config.KafkaTopic,
  partitions: 1,
  replicationFactor: 1
}
];

client.createTopics(topicToCreate, (error, result) => {
  // result is an array of any errors if a given topic could not be created
  console.log(result, 'topic created successfully');
});
Enter fullscreen mode Exit fullscreen mode

在这里,我们导入 Kafka 客户端并连接到我们的 Kafka 服务器。您可能会注意到,在我们的用例中,我们从未配置过副本因子。然而,这并不符合实际情况。

在生产环境中,我们可以根据需要处理的数据量或消息量设置多个 Kafka broker。让我们看看如何在本地环境中实现这一点。

  • 导航到我们下载的二进制文件中的 config 目录cd config
  • 打开 Kafkaserver.properties文件。该文件包含 Kafka 服务器设置的所有配置。我们可以使用以下nano server.properties命令打开该文件。
  • 现在,我们可以创建此文件的多个副本,并仅修改其他副本文件中的一些配置。也就是说,在复制的文件中,我们可以更改一些唯一字段,例如 `<your-name>` broker.idlog.dirs`<your-name>` 和代理或主机端口。有关配置 Kafka 设置的更多信息,请参阅相关文档。

创建主题后,我们现在可以向其中写入数据。写入主题的代码位于文件中producer.js,代码如下所示:

const Kafka = require('kafka-node');
const config  = require('./config');

const Producer = Kafka.Producer;
const client = new Kafka.KafkaClient({kafkaHost: config.KafkaHost});
const producer = new Producer(client,  {requireAcks: 0, partitionerType: 2});



const pushDataToKafka =(dataToPush) => {

  try {
  let payloadToKafkaTopic = [{topic: config.KafkaTopic, messages: JSON.stringify(dataToPush) }];
  console.log(payloadToKafkaTopic);
  producer.on('ready', async function() {
    producer.send(payloadToKafkaTopic, (err, data) => {
          console.log('data: ', data);
  });

  producer.on('error', function(err) {
    //  handle error cases here
  })
  })
  }
catch(error) {
  console.log(error);
}

};


const jsonData = require('./app_json.js');

pushDataToKafka(jsonData);
Enter fullscreen mode Exit fullscreen mode

在这里,我们导入了 kafka-node 库,并配置客户端以接收来自 Kafka broker 的连接。连接建立后,我们将数据发送到指定的 Kafka 主题。请注意,在实际应用中,我们需要在操作完成后调用相应client.close()方法关闭客户端的连接。

现在,当我们使用该./start.sh命令运行启动脚本时,数据就会写入到我们的 Kafka 主题中。

npm start
Enter fullscreen mode Exit fullscreen mode

要读取主题中的数据,我们可以使用consumer.js文件中的消费者脚本运行它node ./consumer.js。我们得到以下输出:

运行消费者脚本
图 5:运行消费者脚本从 Kafka 主题读取数据。

文件的代码consumer.js如下所示:

const kafka = require('kafka-node');
const config = require('./config');

try {
 const Consumer = kafka.Consumer;
 const client = new kafka.KafkaClient({idleConnection: 24 * 60 * 60 * 1000,  kafkaHost: config.KafkaHost});

 let consumer = new Consumer(
    client,
    [{ topic: config.KafkaTopic, partition: 0 }],
    {
      autoCommit: true,
      fetchMaxWaitMs: 1000,
      fetchMaxBytes: 1024 * 1024,
      encoding: 'utf8',
      // fromOffset: false
    }
  );
  consumer.on('message', async function(message) {
    console.log(
      'kafka ',
      JSON.parse(message.value)
    );
  })
  consumer.on('error', function(error) {
    //  handle error 
    console.log('error', error);
  });
}
catch(error) {
  // catch error trace
  console.log(error);
}
Enter fullscreen mode Exit fullscreen mode

在这里,我们连接到 Kafka 客户端并从预定义的 Kafka 主题中消费数据。

注意:完成配置并准备启动应用程序时,必须先启动 ZooKeeper 服务器。之后,才能启动 Kafka 服务器。这是因为 Kafka 的运行依赖于 ZooKeeper。

要启动 ZooKeeper 服务器,我们可以从终端运行以下命令:

bin/zookeeper-server-start.sh config/zookeeper.properties
Enter fullscreen mode Exit fullscreen mode

要启动 Kafka 服务器,我们可以运行:

bin/Kafka-server-start.sh config/server.properties
Enter fullscreen mode Exit fullscreen mode

另外,我们可以通过运行以下命令来检查代理中可用的 Kafka 主题数量:

bin/Kafka-topics.sh --list --zookeeper localhost:2181
Enter fullscreen mode Exit fullscreen mode

最后,我们还可以通过在终端上运行消费者控制台命令来从 Kafka 主题中消费数据,如下所示:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-example-topic  --from-beginning
Enter fullscreen mode Exit fullscreen mode

此外,Kafka 还提供了一个脚本,允许开发人员手动在集群上创建主题。该脚本如下所示:

./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
Enter fullscreen mode Exit fullscreen mode

注意:在创建 Kafka 主题之前,我们必须分别在不同的终端窗口中启动 ZooKeeper 和 Kafka 服务器。

最后,本教程的代码已上传至此GitHub 仓库。要了解 Kafka 的设计理念,您可以查看文档的这一部分。在未来的教程中,我们将探讨 Kafka API 提供的其他工具,例如 Kafka Streams 和 Kafka Connect。有关这些工具的介绍,您可以查看文档的这一部分。

概括

总而言之,Kafka 可以像 RabbitMQ 一样,作为发布/订阅系统,用于构建批量数据的读写流。它还可以用于构建高弹性、可扩展的实时流处理应用程序。值得注意的是,这种流处理可以根据一些预定义的事件动态完成。

此外,与消息系统类似,Kafka 也采用了由高容错性集群组成的存储机制,这些集群具有复制功能且高度分布式。所谓复制,是指数据可以分布在多个不同的集群中,从而将整个链中的数据丢失降至最低。

总而言之,Kafka 可以作为独立插件集成到其他系统中。在这种情况下,它可以根据需要独立扩展。这意味着我们可以独立扩展生产者和消费者,而不会对整个应用程序造成任何副作用。

最后,我们已经了解到,构建数据管道涉及将数据从源点(数据生成的地方,请注意,这也可以指来自其他应用程序的数据输出)移动到目标点(另一个应用程序需要或使用数据的地方)。现在我们可以继续探索其他更复杂的用例了。

如果您有任何疑问,请随时在下方评论区留言或在推特上联系我。


编者按:发现本文有误?您可以在这里找到正确版本。

插件:LogRocket,一款用于 Web 应用的 DVR

 
LogRocket 控制面板免费试用横幅
 
LogRocket是一款前端日志工具,可让您重现问题,如同在您自己的浏览器中发生一样。无需猜测错误原因,也无需用户提供屏幕截图和日志转储,LogRocket 即可让您重现会话,快速了解问题所在。它与任何框架的应用程序完美兼容,并提供插件来记录来自 Redux、Vuex 和 @ngrx/store 的额外上下文信息。
 
除了记录 Redux 操作和状态之外,LogRocket 还会记录控制台日志、JavaScript 错误、堆栈跟踪、包含标头和正文的网络请求/响应、浏览器元数据以及自定义日志。它还会对 DOM 进行插桩,记录页面上的 HTML 和 CSS,即使是最复杂的单页应用程序,也能生成像素级精确的视频。
 
免费试用


文章“使用 Apache Kafka 构建实时数据流应用程序”最初发表于LogRocket 博客

文章来源:https://dev.to/bnevilleoneill/building-a-real-time-data-streaming-app-with-apache-kafka-1n5p