发布于 2026-01-06 6 阅读
0

最简单的 Spring Kafka 生产者和消费者参考

最简单的 Spring Kafka 生产者和消费者

参考

现在让我们使用 spring-kafka 构建并运行一个简单的 Kafka Consumer 和 Kafka Producer 示例。如果您需要本文中使用的 Kafka、Spring Boot 或 Docker 方面的帮助,或者想查看本文中的示例应用程序,请参阅下面的“参考资料”部分。

第一步是创建一个简单的Spring Boot Maven 应用程序,并确保 pom.xml 文件中包含 spring-kafka 依赖项。

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
Enter fullscreen mode Exit fullscreen mode

创建一个 Spring Kafka Consumer

现在让我们使用 spring-boot 的默认配置,用 spring-kafka 编写一个最简单的 Kafka 消费者。

创建一个名为 `<class>` 的类SimpleConsumer,并添加一个带有注解的方法@KakfaListener

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SimpleConsumer {
  @KafkaListener(id = "simple-consumer", topics = "simple-message")
  public void consumeMessage(String message) {
    System.out.println("Got message: " + message);
  }
}
Enter fullscreen mode Exit fullscreen mode

就是这样,只需要这些就够了,因为我们使用的是 spring-boot 的默认配置。

启动 Kafka 和 Zookeeper

正如我们在另一篇文章中详细看到的,我们将使用 docker-compose 来运行本地 Kafka 进行开发,让我们启动 Kafka 和 Zookeeper 容器:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

请确保容器正在运行:

docker ps
Enter fullscreen mode Exit fullscreen mode

你应该能看到 Kafka 和 Zookeeper 的运行情况:

console-kafka-zookeeper-running-docker

运行该应用程序

现在让我们编译并运行应用程序。如果您需要更详细的说明,请查看此帖子。运行以下命令来构建和运行应用程序:

mvn clean package 

Enter fullscreen mode Exit fullscreen mode

让我们开始吧:

mvn spring-boot:run
Enter fullscreen mode Exit fullscreen mode

应用程序将启动,您将在标准输出上看到消费者的配置、正在使用的 Kafka 版本和一条消息Started SpringKafkaApplication in x seconds

应用程序已启动

请确保应用程序保持运行状态,不要关闭运行该应用程序的终端窗口。现在,让我们使用 Kafka 控制台生产者生成几条消息,并观察消费者如何处理这些消息并将其记录下来。

使用 Kafka 控制台生产者生成消息

打开一个新的终端,进入 Kafka 运行容器,这样我们就可以使用控制台生产者了:

docker exec -it kafka /bin/bash

Enter fullscreen mode Exit fullscreen mode

进入容器后cd /opt/kafka/bin,我们当前使用的镜像中 Kafka 的命令行脚本位于此文件夹中。如果您使用的是其他 Docker 镜像,这些脚本可能位于其他位置。

运行控制台生产者,即可向 Kafka 发送消息:

./kafka-console-producer.sh --broker-list localhost:9092 --topic simple-message
Enter fullscreen mode Exit fullscreen mode

kafka-console-producer

现在控制台会阻塞,您可以输入消息并按回车键。每次执行此操作,都会向标准输出发送一条消息simple-topic。尝试发送几条消息,并在运行 Spring Boot 应用程序的 shell 中观察应用程序的标准输出,看看消息是如何被处理和打印的。

console-producer-spring-kafka-consumer

编写一个简单的生产者

现在来创建我们的 Spring Kafka 生产者。创建一个名为 `producer` 的类SimpleProducer,我们将像创建消费者一样,再次使用默认配置来配置生产者。

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class SimpleProducer {

  private KafkaTemplate<String, String> simpleProducer;

  public SimpleProducer(KafkaTemplate<String, String> simpleProducer) {
    this.simpleProducer = simpleProducer;
  }
  public void send(String message) {
    simpleProducer.send("simple-message", message);
  }
}

Enter fullscreen mode Exit fullscreen mode

编写一个端点

现在让我们创建一个简单的端点,它将接收一条文本消息并将其发布到 Kafka,目前我们始终返回 200 OK。

package io.stockgeeks.springkafka.springkafkaapp;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api")
public class MessageApi {

  private final SimpleProducer simpleProducer;

  public MessageApi(SimpleProducer simpleProducer) {
    this.simpleProducer = simpleProducer;
  }

  @PostMapping("/message")
  public ResponseEntity<String> message(@RequestBody String message) {
    simpleProducer.send(message);
    return ResponseEntity.ok("Message received: " + message);
  }
}

Enter fullscreen mode Exit fullscreen mode

构建并运行应用程序。

mvn clean package && mvn spring-boot:run
Enter fullscreen mode Exit fullscreen mode

现在在开发机器上运行应用程序时,很有可能会出错,这是因为您的应用程序在普通主机网络中运行,而 Kafka 和 zookeeper 在“docker 网络”中运行。

有几种方法可以解决这个问题,最好的方法是在启动容器时将开发机器的主机名传递给 docker-compose。如果您打开此项目的 docker-compose 文件,它会在 `<configuration>` 处有一个类似这样KAFKA_ADVERTISED_LISTENERS: ... LISTENER_DOCKER_EXTERNAL的条目${DOCKER_HOST_IP:-kafka}:9092,告诉 compose 尝试使用传入的主机名,否则默认使用 Kafka。请查看 compose 文件中的注释以了解如何修复它,并查看下面的参考部分以了解更多详细信息。

使用 curl 发送一些消息

现在请注意观察应用程序终端,然后在另一个终端窗口中,我们使用 curl 发送一些消息:

curl -X POST http://localhost:8080/api/message -d "yet more fun" -H "Content-Type: text/plain"
Enter fullscreen mode Exit fullscreen mode

你应该在执行 curl 的同一终端上看到响应,还要检查处理消息并将其打印到应用程序运行终端的消费者。

春天卡夫卡卷发

完毕

好了,大功告成。你现在已经创建了一个最简单的 Spring Boot 应用,它可以生成和消费 Kafka 消息。它之所以看起来如此简单,是因为我们使用的是 Spring Boot 和 spring-kafka 的默认配置。

如果您想了解更多关于 Spring Boot 或 Kafka 的工作原理,请查看下一节中的链接,您将在那里找到一些包含更多详细信息的参考资料。

我们将在另一篇文章中介绍如何测试消费者和生产者。祝您编程愉快!

参考

本文中创建的应用程序的源代码。

要设置包含 java、maven、docker 和 docker-compose 的环境,请查看示例教程中如何设置环境

如果您需要快速了解 Kafka,请参阅:Kafka 速成课程

有关如何使用 docker-compose 进行本地开发的一些见解,请查看这篇文章:一个即可运行所有命令,您还将在其中学习一些有用的 Kafka 命令。

如果您是 Spring Boot 新手,请查看Spring Boot 速成课程。

使用 Docker compose 环境变量来了解 Kafka Advertise Listener 的配置。

文章来源:https://dev.to/thegroo/spring-kafka- Producer-and-consumer-41oc