最简单的 Spring Kafka 生产者和消费者
参考
现在让我们使用 spring-kafka 构建并运行一个简单的 Kafka Consumer 和 Kafka Producer 示例。如果您需要本文中使用的 Kafka、Spring Boot 或 Docker 方面的帮助,或者想查看本文中的示例应用程序,请参阅下面的“参考资料”部分。
第一步是创建一个简单的Spring Boot Maven 应用程序,并确保 pom.xml 文件中包含 spring-kafka 依赖项。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
创建一个 Spring Kafka Consumer
现在让我们使用 spring-boot 的默认配置,用 spring-kafka 编写一个最简单的 Kafka 消费者。
创建一个名为 `<class>` 的类SimpleConsumer,并添加一个带有注解的方法@KakfaListener。
package io.stockgeeks.springkafka.springkafkaapp;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class SimpleConsumer {
@KafkaListener(id = "simple-consumer", topics = "simple-message")
public void consumeMessage(String message) {
System.out.println("Got message: " + message);
}
}
就是这样,只需要这些就够了,因为我们使用的是 spring-boot 的默认配置。
启动 Kafka 和 Zookeeper
正如我们在另一篇文章中详细看到的,我们将使用 docker-compose 来运行本地 Kafka 进行开发,让我们启动 Kafka 和 Zookeeper 容器:
docker-compose up -d
请确保容器正在运行:
docker ps
你应该能看到 Kafka 和 Zookeeper 的运行情况:
运行该应用程序
现在让我们编译并运行应用程序。如果您需要更详细的说明,请查看此帖子。运行以下命令来构建和运行应用程序:
mvn clean package
让我们开始吧:
mvn spring-boot:run
应用程序将启动,您将在标准输出上看到消费者的配置、正在使用的 Kafka 版本和一条消息Started SpringKafkaApplication in x seconds。
请确保应用程序保持运行状态,不要关闭运行该应用程序的终端窗口。现在,让我们使用 Kafka 控制台生产者生成几条消息,并观察消费者如何处理这些消息并将其记录下来。
使用 Kafka 控制台生产者生成消息
打开一个新的终端,进入 Kafka 运行容器,这样我们就可以使用控制台生产者了:
docker exec -it kafka /bin/bash
进入容器后cd /opt/kafka/bin,我们当前使用的镜像中 Kafka 的命令行脚本位于此文件夹中。如果您使用的是其他 Docker 镜像,这些脚本可能位于其他位置。
运行控制台生产者,即可向 Kafka 发送消息:
./kafka-console-producer.sh --broker-list localhost:9092 --topic simple-message
现在控制台会阻塞,您可以输入消息并按回车键。每次执行此操作,都会向标准输出发送一条消息simple-topic。尝试发送几条消息,并在运行 Spring Boot 应用程序的 shell 中观察应用程序的标准输出,看看消息是如何被处理和打印的。
编写一个简单的生产者
现在来创建我们的 Spring Kafka 生产者。创建一个名为 `producer` 的类SimpleProducer,我们将像创建消费者一样,再次使用默认配置来配置生产者。
package io.stockgeeks.springkafka.springkafkaapp;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class SimpleProducer {
private KafkaTemplate<String, String> simpleProducer;
public SimpleProducer(KafkaTemplate<String, String> simpleProducer) {
this.simpleProducer = simpleProducer;
}
public void send(String message) {
simpleProducer.send("simple-message", message);
}
}
编写一个端点
现在让我们创建一个简单的端点,它将接收一条文本消息并将其发布到 Kafka,目前我们始终返回 200 OK。
package io.stockgeeks.springkafka.springkafkaapp;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api")
public class MessageApi {
private final SimpleProducer simpleProducer;
public MessageApi(SimpleProducer simpleProducer) {
this.simpleProducer = simpleProducer;
}
@PostMapping("/message")
public ResponseEntity<String> message(@RequestBody String message) {
simpleProducer.send(message);
return ResponseEntity.ok("Message received: " + message);
}
}
构建并运行应用程序。
mvn clean package && mvn spring-boot:run
现在在开发机器上运行应用程序时,很有可能会出错,这是因为您的应用程序在普通主机网络中运行,而 Kafka 和 zookeeper 在“docker 网络”中运行。
有几种方法可以解决这个问题,最好的方法是在启动容器时将开发机器的主机名传递给 docker-compose。如果您打开此项目的 docker-compose 文件,它会在 `<configuration>` 处有一个类似这样
KAFKA_ADVERTISED_LISTENERS: ... LISTENER_DOCKER_EXTERNAL的条目${DOCKER_HOST_IP:-kafka}:9092,告诉 compose 尝试使用传入的主机名,否则默认使用 Kafka。请查看 compose 文件中的注释以了解如何修复它,并查看下面的参考部分以了解更多详细信息。
使用 curl 发送一些消息
现在请注意观察应用程序终端,然后在另一个终端窗口中,我们使用 curl 发送一些消息:
curl -X POST http://localhost:8080/api/message -d "yet more fun" -H "Content-Type: text/plain"
你应该在执行 curl 的同一终端上看到响应,还要检查处理消息并将其打印到应用程序运行终端的消费者。
完毕
好了,大功告成。你现在已经创建了一个最简单的 Spring Boot 应用,它可以生成和消费 Kafka 消息。它之所以看起来如此简单,是因为我们使用的是 Spring Boot 和 spring-kafka 的默认配置。
如果您想了解更多关于 Spring Boot 或 Kafka 的工作原理,请查看下一节中的链接,您将在那里找到一些包含更多详细信息的参考资料。
我们将在另一篇文章中介绍如何测试消费者和生产者。祝您编程愉快!
参考
本文中创建的应用程序的源代码。
要设置包含 java、maven、docker 和 docker-compose 的环境,请查看示例教程中如何设置环境。
如果您需要快速了解 Kafka,请参阅:Kafka 速成课程
有关如何使用 docker-compose 进行本地开发的一些见解,请查看这篇文章:一个即可运行所有命令,您还将在其中学习一些有用的 Kafka 命令。
如果您是 Spring Boot 新手,请查看Spring Boot 速成课程。
使用 Docker compose 环境变量来了解 Kafka Advertise Listener 的配置。
文章来源:https://dev.to/thegroo/spring-kafka- Producer-and-consumer-41oc



