发布于 2026-01-06 0 阅读
0

在 Kubernetes 上轻松连接 Kafka!

在 Kubernetes 上轻松连接 Kafka!

本教程将通过示例演示如何在 Kubernetes 上使用Kafka Connect进行设置和使用。Strimzi

Kafka Connect 是一款用于在 Apache Kafka 和其他系统之间使用源连接器和接收器连接器实现可扩展、可靠的数据流的工具。虽然在 Kubernetes 上部署 Kafka Connect 集群并不难(只需“自己动手”!),但我很喜欢它能够借助自定义资源定义,使用Operator 模式Strimzi以Kubernetes 原生的方式实现这一点

除了引导/安装 Kafka Connect 之外,这也适用于扩展 Connect 集群、部署和管理连接器等操作(您将在本博客文章中看到实际操作)。

我们将逐步讲解如何在 Kubernetes 上部署 Kafka Connect 集群、安装连接器并进行测试——当然,所有这些都会用到kubectl一些工具YAML(例如 Azure 和 Azure Kubernetes 服务)。我将使用Azure 事件中心作为 Kafka 代理,Azure Kubernetes 服务作为 Kubernetes 集群——您也可以使用其他替代方案(例如,minikube在您的笔记本电脑上搭建本地集群)。

所有工件都可以在 GitHub 上找到。

Strimzi负责所有繁重的工作……如果你还不知道,这里简单概括一下。

Strimzi概述

Strimzi 的文档内容详尽,条理清晰,组织结构也非常好!以下大部分内容直接摘自文档。

StrimziStrimzi 简化了在 Kubernetes 集群中运行 Apache Kafka 的过程。它提供用于在 Kubernetes 上运行 Kafka 的容器镜像和 Operator。截至撰写本文时,它是Kubernetes项目的一部分。Cloud Native Computing FoundationSandbox

Strimzi Operators这些 Operator 对项目至关重要。它们经过专门设计,融合了专业的运维知识,能够高效地管理 Kafka。Operator 简化了以下流程:部署和运行 Kafka 集群及组件、配置和保护 Kafka 访问权限、升级和管理 Kafka,甚至还能处理主题和用户的管理。

下图从 10,000 英尺的高度概览了操作员的角色:

本文不会深入探讨如何使用 Strimzi 部署 Kafka,这可能是我以后博客中会涉及的内容。

先决条件

kubectl- https://kubernetes.io/docs/tasks/tools/install-kubectl/

如果您选择使用 Azure 事件中心、Azure Kubernetes 服务(或两者都使用),则需要一个Microsoft Azure 帐户。立即注册一个免费帐户吧!

Azure CLI或者Azure Cloud Shell,您可以选择安装Azure CLI(如果您还没有安装的话,应该很快就能安装!),或者直接使用浏览器中的Azure Cloud Shell 。

我将使用它Helm来安装Strimzi。以下是安装说明文档Helmhttps://helm.sh/docs/intro/install/

您也可以YAML直接使用这些文件进行安装Strimzi。请在此处查看快速入门指南:https://strimzi.io/docs/quickstart/latest/#proc-install-product-str

首先,让我们设置所需的 Azure 服务(如果您不使用 Azure,请跳过此部分,但请确保您拥有 Kafka 集群的详细信息,例如代理 URL 和身份验证凭据(如果适用))。

我建议将以下服务安装到同一个Azure 资源组中,这样可以方便地清理这些服务。

Azure 事件中心

Azure 事件中心是一个数据流平台和事件接收服务。它每秒可以接收和处理数百万个事件。它还提供了一个 Kafka 终结点,现有的基于 Kafka 的应用程序可以使用该终结点来替代运行自己的 Kafka 集群。事件中心支持 Apache Kafka 协议 1.0 及更高版本,并且可以与现有的 Kafka 客户端应用程序以及 Kafka 生态系统中的其他工具Kafka Connect(包括本博客中演示的工具)协同工作MirrorMaker

要设置 Azure 事件中心群集,您可以选择多种选项,包括Azure 门户Azure CLIAzure PowerShellARM 模板。设置完成后,您需要连接字符串(将在后续步骤中使用)来对事件中心进行身份验证-请参考本指南完成此步骤。

请确保您也创建一个事件中心(与 Kafka 主题相同),作为我们 Kafka Connect 连接器的目标(详情请参见后续章节)。

Azure Kubernetes 服务

Azure Kubernetes 服务 (AKS)简化了在 Azure 中部署托管 Kubernetes 集群的过程。它将大部分管理职责转移到 Azure,从而降低了 Kubernetes 管理的复杂性和运维开销。以下示例展示了如何使用Azure CLIAzure 门户ARM 模板设置 AKS 集群。

基础安装

首先,我们将安装StrimziKafka Connect,然后安装文件流源连接器。

安装 Strimzi

安装 StrimziHelm非常简单:

//add helm chart repo for Strimzi
helm repo add strimzi https://strimzi.io/charts/

//install it! (I have used strimzi-kafka as the release name)
helm install strimzi-kafka strimzi/strimzi-kafka-operator
Enter fullscreen mode Exit fullscreen mode

这将安装StrimziOperator(它实际上就是一个 Kubernetes 组件Deployment)、自定义资源定义和其他 Kubernetes 组件,例如Cluster RolesKubernetesCluster Role Bindings和 Kubernetes 容器。Service Accounts

更多详情请点击此链接

要删除,只需helm uninstall strimzi-kafka

要确认 Strimzi 操作员已部署,请检查其状态(过一段时间后Pod应变为已部署状态)。Running

kubectl get pods -l=name=strimzi-cluster-operator

NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-5c66f679d5-69rgk   1/1     Running   0          43s
Enter fullscreen mode Exit fullscreen mode

同时检查自定义资源定义:

kubectl get crd | grep strimzi

kafkabridges.kafka.strimzi.io           2020-04-13T16:49:36Z
kafkaconnectors.kafka.strimzi.io        2020-04-13T16:49:33Z
kafkaconnects.kafka.strimzi.io          2020-04-13T16:49:36Z
kafkaconnects2is.kafka.strimzi.io       2020-04-13T16:49:38Z
kafkamirrormaker2s.kafka.strimzi.io     2020-04-13T16:49:37Z
kafkamirrormakers.kafka.strimzi.io      2020-04-13T16:49:39Z
kafkas.kafka.strimzi.io                 2020-04-13T16:49:40Z
kafkatopics.kafka.strimzi.io            2020-04-13T16:49:34Z
kafkausers.kafka.strimzi.io             2020-04-13T16:49:33Z
Enter fullscreen mode Exit fullscreen mode

我想特别指出kafkas.kafka.strimzi.ioKubernetes 中哪些代表 Kafka 集群。我们将重点关注kafkaconnects.kafka.strimzi.io分别kafkaconnectors.kafka.strimzi.io代表 Kafka Connect 集群和连接器的组件。

我将略过其他组件,但您可以自行查找它们,例如集群角色。kubectl get clusterrole | grep strimzi

现在我们已经把“大脑”(Strimzi 操作员)连接好了,让我们开始使用它吧!

Kafka Connect

在部署 Kafka Connect 本身之前,我们需要创建一些辅助 Kubernetes 组件。

在继续操作之前,请克隆GitHub 项目。

git clone https://github.com/abhirockzz/strimzi-kafka-connect-eventhubs
cd strimzi-kafka-connect-eventhubs
Enter fullscreen mode Exit fullscreen mode

Kafka Connect 需要引用一个现有的 Kafka 集群(在本例中为 Azure 事件中心)。我们可以将集群的身份验证信息存储为KubernetesSecret实例,以便稍后在 Kafka Connect 定义中使用。

更新eventhubs-secret.yaml文件,使其包含 Azure 事件中心的凭据。在eventhubspassword属性中输入连接字符串。

例如

apiVersion: v1
kind: Secret
metadata:
  name: eventhubssecret
type: Opaque
stringData:
  eventhubsuser: $ConnectionString
  eventhubspassword: Endpoint=sb://<eventhubs-namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<access-key>
Enter fullscreen mode Exit fullscreen mode

保持eventhubsuser: $ConnectionString不变

创建Secret

kubectl apply -f eventhubs-secret.yaml
Enter fullscreen mode Exit fullscreen mode

默认情况下,Kafka Connect 配置为将日志发送到stdout。我们将使用自定义配置(log4j)来确保日志/tmp/connect-worker.log除了 之外,还会存储到stdout——稍后您就会明白这样做的原因。

配置本身存储在一个log4j.properties

日志配置可以存储在一个文件中,ConfigMap该文件稍后将被 Kafka Connect 定义引用。详情请参阅https://strimzi.io/docs/latest/#con-kafka-connect-logging-deployment-configuration-kafka-connect

kubectl create configmap connect-logging-configmap --from-file=log4j.properties
Enter fullscreen mode Exit fullscreen mode

在部署 Kafka Connect 之前,我们先来看看它的定义。您可以在这里查看完整定义,但我将重点介绍其中的一些重要部分。

请注意,该资源kindKafkaConnect一个自定义资源定义。另一个有趣的地方是annotations(稍后我会解释)。

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
Enter fullscreen mode Exit fullscreen mode

bootstrapServers指向 Kafka 代理。对于高可用性集群中的节点,这可能是一个以逗号分隔的值。在本例中,它指的是 Azure 事件中心的单个 Kafka 端点(没错,这就是您所需要的!)。

spec:
  version: 2.4.0
  replicas: 1
  bootstrapServers: <eventhubs-namespace>.servicebus.windows.net:9093
Enter fullscreen mode Exit fullscreen mode

config这只是传统的 Kafka Connect 配置,类似于您在以下环境中使用的配置:connect-distributed.properties

  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
Enter fullscreen mode Exit fullscreen mode

身份验证部分仅指 Kubernetes Secret。在本例中,我们之前创建了一个 Secret,eventhubssecret其名称eventhubspassword包含 Azure 事件中心的连接字符串。

  authentication:
    type: plain
    username: $ConnectionString
    passwordSecret:
      secretName: eventhubssecret
      password: eventhubspassword
Enter fullscreen mode Exit fullscreen mode

这里ConfigMap引用log4j了配置。这将自动配置 Kafka Connect 以使用此配置。

  logging:
    type: external
    name: connect-logging-configmap
Enter fullscreen mode Exit fullscreen mode

tls此部分用于配置 TLS 证书(显而易见!)。对于事件中心,尽管我们使用 SASL 而非明文传输,但仍需要您使用 SSL(即设置security.protocolSASL_SSL)。我最初遇到了这个问题,但很快就得到了解答!因此添加了以下配置:

  tls:
    trustedCertificates: []
Enter fullscreen mode Exit fullscreen mode

太棒了!我们准备创建 Kafka Connect 实例。在此之前,请确保使用bootstrapServersAzure 事件中心主机名更新属性,例如:

spec:
  version: 2.4.0
  replicas: 1
  bootstrapServers: <replace-with-eventhubs-namespace>.servicebus.windows.net:9093
Enter fullscreen mode Exit fullscreen mode

创建 Kafka Connect 实例:

kubectl apply -f kafka-connect.yaml
Enter fullscreen mode Exit fullscreen mode

确认:

kubectl get kafkaconnects

NAME                 DESIRED REPLICAS
my-connect-cluster   1
Enter fullscreen mode Exit fullscreen mode

这将创建一个Deployment和一个相应的Pod

kubectl get pod -l=strimzi.io/cluster=my-connect-cluster

NAME                                          READY   STATUS    RESTARTS   AGE
my-connect-cluster-connect-5bf9db5d9f-9ttg4   1/1     Running   0          1h
Enter fullscreen mode Exit fullscreen mode

您的 Kubernetes 集群中已部署了 Kafka Connect!请使用以下命令查看日志。kubectl logs <pod name>

安装文件源连接器

让我们部署一个连接器!为了简化操作,我们将使用 Kafka Connect 默认自带的文件流源连接器。安装和管理连接器的常用方法是使用 Kafka Connect REST API,但还有另一种方法Strimzi。这是一种以 Kubernetes 为中心的方法,其中 Kafka Connect 连接器由一个名为 `<connector_name>` 的自定义资源定义表示KafkaConnector。我们只需要创建、更新或删除KafkaConnector包含连接器详细信息的 `<connector_name>` 定义,Strimzi剩下的工作就交给 Kubernetes 处理了!

Strimzi请查看文档了解详情:https://strimzi.io/docs/latest/#con-creating-managing-connectors-str

以下是我们连接器的定义:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/tmp/connect-worker.log"
    topic: strimzi
Enter fullscreen mode Exit fullscreen mode

就像之前一样,让我们​​来了解一下每个组成部分的含义:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: my-source-connector
Enter fullscreen mode Exit fullscreen mode

这是一个KafkaConnector资源(由指定kind),其名称为my-source-connector

  labels:
    strimzi.io/cluster: my-connect-cluster
Enter fullscreen mode Exit fullscreen mode

这里指的是 Kafka Connect 集群——还记得上面 Kafka Connect 定义中的这个注释吗?

  annotations:
    strimzi.io/use-connector-resources: "true"
Enter fullscreen mode Exit fullscreen mode

这只是简单地“激活”该功能,确保我们能够使用 CRD 部署连接器,并且我们只需使用标签KafkaConnector引用资源名称即可。kafkaconnectstrimzi.io/cluster

spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/tmp/connect-worker.log"
    topic: strimzi
Enter fullscreen mode Exit fullscreen mode

最后,在连接器规范中,我们定义了连接器的属性。注意到config指向/tmp/connect-worker.log文件的属性了吗?回想一下,我们修改了 Kafka Connect 实例,使其将日志推送到此文件。现在,我们已配置文件源连接器,使其能够流式传输此(日志)文件的内容,并将其发送到名为 `<topic_name>` 的 Kafka 主题strimzi。这是一个很好的演示,因为文件会不断更新,我们应该能够在目标 Kafka 主题(位于 Azure 事件中心)中看到每一行作为不同的消息。

我已将其用作strimzi主题名称。此名称必须与上一节(设置 Azure 事件中心时)创建的事件中心名称相同。

为了实际体验这一点,让我们部署连接器。

kubectl apply -f filestream-source-connector.yaml
Enter fullscreen mode Exit fullscreen mode

为确认这一点,只需列出连接器即可:

kubectl get kafkaconnectors

NAME                  AGE
my-source-connector   70s
Enter fullscreen mode Exit fullscreen mode

您还可以安装其他连接器。其中一种方法(也是我认为最简单的方法)是扩展Strimzi基础镜像,并在其上添加所需的连接器组件。请参阅文档:https://strimzi.io/docs/latest/#using-kafka-connect-with-plug-ins-str

Kafka Connect 实际应用……

首先,我们需要确认 Kafka Connect 日志是否已正确传输到目标位置。这一点很重要,因为我们将日志文件用作文件流连接器的数据源。为此,我们需要查看 Kafka Connect 的内部配置,Pod例如:

kubectl exec -it <kafka_connect_pod_name> -- tail -f /tmp/connect-worker.log
Enter fullscreen mode Exit fullscreen mode

为了简化操作,只需使用以下命令:

kubectl exec -it $(kubectl get pod -l=strimzi.io/cluster=my-connect-cluster -o jsonpath='{.items[0].metadata.name}') -- tail -f /tmp/connect-worker.log
Enter fullscreen mode Exit fullscreen mode

现在你应该能看到 Kafka Connect 日志了……

在另一个终端窗口中,启动一个连接到 Azure 事件中心主题的消费者进程。我使用的是kafkacatKafka CLI 中的控制台消费者,但还有其他选择,例如Kafka CLI 本身的控制台消费者,或者使用 Java、.NET、Go 等语言编写的程序化消费者(尽管在这种情况下可能有点杀鸡用牛刀)。

您在这里也应该看到相同的日志!例如

...
{"schema":{"type":"string","optional":false},"payload":"[2020-04-16 04:59:25,731] INFO WorkerSourceTask{id=my-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)"}
{"schema":{"type":"string","optional":false},"payload":"[2020-04-16 04:59:25,785] INFO WorkerSourceTask{id=my-source-connector-0} Finished commitOffsets successfully in 55 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)"}
...
Enter fullscreen mode Exit fullscreen mode

日志本身作为payload示例的一部分被捕获。[2020-04-16 04:59:25,785] INFO WorkerSourceTask{id=my-source-connector-0} Finished commitOffsets successfully in 55 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)

管理 Kafka Connect 资源

要扩展 Kafka Connect,只需更新副本数,例如,在本例中spec从1 更新1到 10 :2

spec:
  version: 2.4.0
  replicas: 2
Enter fullscreen mode Exit fullscreen mode

应用更新后的清单

kubectl apply -f kafka-connect.yaml
Enter fullscreen mode Exit fullscreen mode

请确保通过更新清单文件来增加副本数量,而不是通过更新Deployment配置项kubectl scale。这是因为,Strimzi操作员协调循环会检查KafkaConnect资源,发现replicas副本数量不足1,然后缩减Deployment副本数量。

Pods现在应该有两个了:

kubectl get pod -l=strimzi.io/cluster=my-connect-cluster


NAME                                          READY   STATUS    RESTARTS   AGE
my-connect-cluster-connect-5bf9db5d9f-9ttg4   1/1     Running   0          45m
my-connect-cluster-connect-5bf9db5d9f-pzn95   1/1     Running   0          1m5s
Enter fullscreen mode Exit fullscreen mode

my-connect-cluster-connect-5bf9db5d9f-pzn95是新的Pod

您可以更新连接器规范。例如,要分配更多任务,请tasksMax从更新25

...
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 5
...
Enter fullscreen mode Exit fullscreen mode

注意:这将重启连接器。

清理

要删除连接器和 Kafka Connect 实例:

kubectl delete -f filestream-source-connector.yaml
kubectl delete -f kafka-connect.yaml
Enter fullscreen mode Exit fullscreen mode

要清理 AKS 群集和 Azure 事件中心,只需删除资源组即可:

az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait
Enter fullscreen mode Exit fullscreen mode

这篇博文就到此结束了!

正如我之前提到的,Strimzi 的文档非常详尽,而且清晰易懂,方便查阅。最后,除了我在文章中提到的内容之外,我还想分享一些我觉得很有用的 Strimzi 文档参考资料:

Strimzi 文档参考

希望这对您在 Kubernetes 上开始使用 Kafka Connect 有所帮助 :)

文章来源:https://dev.to/azure/kafka-connect-on-kubernetes-the-easy-way-2co9