发布于 2026-01-06 6 阅读
0

使用 Kafka 和 Python 构建流式欺诈检测系统

使用 Kafka 和 Python 构建流式欺诈检测系统

这是我的“使用 Apache Kafka 构建流式应用程序”系列文章的第二篇。如果您错过了第一篇,可以先阅读第一篇,了解本系列文章的缘起以及您将会看到的内容。

这次,我们将亲自动手,使用 Python 客户端创建我们第一个基于 Apache Kafka 的流式应用程序。我会尽量让它接近真实的Kafka 应用程序。

我写这篇博客文章是为了做一个教程。你不需要了解任何关于流处理或 Apache Kafka 的知识——我会边讲解边解释相关的概念和术语——你可以跟着我们一起构建系统。

我们将构建一个实时欺诈检测系统。我们将从零开始构建。我们将生成交易流并处理这些交易,以检测哪些交易可能是欺诈行为。

听起来很刺激吧?希望如此!

您可以在GitHub上找到支持这篇博文的完整代码

先决条件

我写这篇文章是为了方便大家跟着我的步骤一起开发各种应用程序。不过,需要先满足一些前提条件。

首先,我们需要在本地运行一个 Kafka 集群。这通常不会占用太多资源,但您可能需要一台性能不错的开发机器。

接下来,我们将使用Docker Compose运行集群,因为这将使我们尽可能接近生产系统——也就是真实世界的环境。所以请确保您已安装DockerDocker Compose

因此,我也希望您对 Docker 和 Docker Compose 有一定的了解。虽然我会解释更详细的内容,但您至少应该知道如何读取 .dockerDockerfile文件docker-compose.yml

最后,因为我们将使用Python构建应用程序,所以最后一个先决条件是具备 Python 语言和生态系统的基本知识。

差不多就是这样了,让我们开始吧!

启动本地 Kafka 集群

Apache Kafka 是一款软件,与所有软件一样,它运行在实际的计算机上——就这篇博文而言,就是你自己的计算机。

首先,我们来了解一些术语。Kafka 是一个分布式系统,它运行在一个集群中,也就是多台计算机(也称为节点)相互通信。在 Kafka 术语中,节点被称为代理(broker)。你的计算机将是唯一的代理。

Kafka 运行需要什么?实际上,它只需要两样东西——Kafka broker 和Zookeeper实例。

“什么是动物园管理员?!”

别担心——你其实不需要知道它是什么。简单来说,它是一个分布式协调软件,Apache Kafka 使用它来跟踪集群状态和成员。它是任何 Kafka 集群的重要组成部分,但我们不会过多赘述。

在引言中,我提到我们将使用Docker Compose来运行集群。让我们看看效果如何。

Docker Compose 配置

docker-compose.yml我们将使用以下文件。它使用了来自Confluent Platform 的Docker镜像——Confluent是 Apache Kafka 社区的主要参与者——并受到了他们的Kafka 单节点示例的启发。

它有两个服务:一个用于 Kafka 代理,一个用于 Zookeeper 实例。

version: '3'

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

我会详细解释这里发生了什么——但如果你只想直接运行,也可以跳到下一部分。

这项zookeeper服务是——没错——为Zookeeper提供的。它使用 Confluent Platform 的 Docker 镜像cp-zookeeper。两个环境变量定义了 Zookeeper 的可访问端口(我们将把这个端口传递给代理)及其基本时间单位(一个必需的配置细节)。

其次,这项kafka服务就是我们的Kafka broker。在为数不多的环境变量中,我们需要关注的两个是 ` KAFKA_ZOOKEEPER_CONNECTZookeeper` 和 `connection`。`Zookeeper` 告诉 broker 它在哪里可以找到 Zookeeper,而KAFKA_ADVERTISED_LISTENERS`connection` 定义了我们可以从其他应用程序连接到 broker 的位置。您无需关心副本因子;我只需告诉您,它设置为 1,因为集群中只有一个 broker。

现在我们已经定义了集群的 Docker Compose 配置,接下来我们……

启动集群!

现在是时候启动本地 Kafka 集群了。怎么做呢?如果您了解 Docker Compose,那就很简单。只需进入项目根目录(Kafka 文件所在的docker-compose.yml目录)并运行以下命令:

$ docker-compose up
Enter fullscreen mode Exit fullscreen mode

你会看到控制台里出现一大堆日志——别担心!这是正常的。Kafka 和 Zookeeper 在启动时都会输出很多信息。要知道它们何时初始化完成,你可以在另一个终端运行以下命令。它会等待服务broker输出“已启动”消息。

$ docker-compose logs -f broker | grep started
broker_1 | INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
Enter fullscreen mode Exit fullscreen mode

如果成功了,恭喜!你现在已经搭建并运行了一个单节点 Kafka 集群。不过它本身可能用处不大。不如我们开始传输一些数据吧?

进入:欺诈检测系统

没错——现在正是我们大展身手的时候!🎉

我本来可以详细讲解如何制作一个“Hello, World!”流式应用程序,在这个应用程序中,用户可以在一端(生产者)输入消息,然后在另一端(消费者)实时接收这些消息。但是,Confluent 的 Docker 快速入门指南和其他地方已经有完全相同的教程了。

相反,我希望我们构建一个能够解决实际业务用例的真正系统。在控制文章篇幅的同时做到这一点并不容易,但我们会看看结果如何。

我们将构建的系统是一个简单的欺诈检测机制——类似于您在银行和交易监控系统中可能遇到的那种机制。

我们将在一端生成虚假交易,并在另一端过滤并记录那些可疑的交易。这需要一个交易生成器(因为我们需要数据进行处理)和一个欺诈检测器。这两个应用程序都将在 Docker 容器中运行,并与我们的 Kafka 集群交互。

欺诈检测系统框图。

既然我们已经了解了这个系统的基本思路,那就让我们开始编写一些代码吧?

创建应用框架

我们将首先创建交易生成器和欺诈检测器的容器化框架。它们都是简单的 Python 应用程序,因此我们将Dockerfile在两种情况下使用相同的代码:

# Dockerfile
FROM python:3.6

WORKDIR /usr/app

ADD ./requirements.txt ./
RUN pip install -r requirements.txt
ADD ./ ./

CMD ["python", "app.py"]
Enter fullscreen mode Exit fullscreen mode

我们将创建两个文件夹来存放这些内容Dockerfile:一个用于交易生成器,一个用于欺诈检测器。

我们还会引入一个 Python 依赖项——Kafka Python,它是 Kafka(以及其他服务)的 Python 客户端。我们将使用它来构建与 Kafka 集群交互的应用程序。

# requirements.txt
kafka-python
Enter fullscreen mode Exit fullscreen mode

实际的 Python 代码将位于app.py每个应用程序的文件中。因此,最终我们会得到如下的文件夹结构:

.
├── docker-compose.yml
├── detector
│   ├── Dockerfile
│   ├── app.py
│   └── requirements.txt
└── generator
    ├── Dockerfile
    ├── app.py
    └── requirements.txt
Enter fullscreen mode Exit fullscreen mode

稍后我们将把这些服务集成到开发环境中。但在此之前,让我先带您了解如何将 Kafka 放入独立的 Docker Compose 文件中,这样我们就可以像使用实际生产集群一样使用它。

隔离 Kafka 集群

在生产环境中,Kafka 始终以服务的形式运行。这意味着 Kafka 集群独立于使用它的应用程序运行。后者只需连接到集群即可发布或读取事件。

我们正在尝试构建一个真实世界的系统,那么我们何不让我们的应用程序也与一个独立的、类似生产环境的 Kafka 集群进行交互呢?

为了在本地实现这一点,我们将把我们的服务迁移zookeeperbroker一个单独的 Docker Compose 配置中。我们把它放在一个docker-compose.kafka.yml文件中。稍后,我们将docker-compose.yml仅对应用程序服务使用常规配置。

# docker-compose.kafka.yml
version: '3'

services:
    # ... zookeeper and broker as before ...
Enter fullscreen mode Exit fullscreen mode

你可能觉得这样就足够了,但其实还缺少一些东西。为了让两个 Docker Compose 文件都能访问同一个网络(记住,默认情况下每个 Docker Compose 文件都使用自己的私有网络),我们需要使用一个外部 Docker 网络。我们把它命名为 ` <network_name> kafka-network`。现在就来创建这个网络吧:

$ docker network create kafka-network
85351a30f253b0fb8ae197ea07ddeddd0412c8a789e910dd4a3f424057122d30
Enter fullscreen mode Exit fullscreen mode

现在我们可以添加networks以下部分docker-compose.kafka.yml

# docker-compose.kafka.yml
version: '3'

services:
    # ... zookeeper and broker as before ...

# NEW
networks:
  default:
    external:
      name: kafka-network
Enter fullscreen mode Exit fullscreen mode

(如果您有兴趣,可以阅读更多关于Docker Compose 网络方面的内容)。

太好了,我们现在已经在单独的 Docker Compose 配置中部署了 Kafka 集群。那又怎样呢?

这意味着我们可以将集群视为一个完全隔离的系统。您可以独立于其他 Docker 应用程序启动或关闭它——就像在现实世界中一样。

实际上,要修改这个 Docker Compose 文件,我们需要使用-f标志来指定我们要使用的配置。

例如,要启动 Kafka 集群,您现在将使用:

docker-compose -f docker-compose.kafka.yml up
Enter fullscreen mode Exit fullscreen mode

如果您想访问日志:

docker-compose -f docker-compose.kafka.yml logs broker
Enter fullscreen mode Exit fullscreen mode

你明白我的意思了。

现在,我们可以创建一个docker-compose.yml具有相同网络配置的空实例,稍后我们将向其中添加交易生成器和欺诈检测服务:

# docker-compose.yml
version: '3'

services:
    # TODO: generator and detector coming soon

# Give this composition access to the Kafka network
networks:
  default:
    external:
      name: kafka-network
Enter fullscreen mode Exit fullscreen mode

转移话题就到此为止——我想我们一切就绪!让我们开始构建流媒体欺诈检测系统吧。

构建交易生成器

要进行流处理,首先需要一个数据流。这时就需要用到事务生成器了。我们将使用它来创建一个事务流

在 Kafka 中,您可以构建几种不同类型的应用程序。其中一类应用程序是生产者

生产者执行我们所需的操作——它们向 Kafka 集群发布消息。这些消息将存储在我们代理服务器上的一个称为“主题”的存储位置——主题是磁盘上用于存储消息的命名存储。

Kafka 生产者的工作原理。很简单,对吧?

现在,我有个好消息。我们使用的 Python 客户端(Kafka Python)允许我们构建生产者。🎉

所以让我们使用Kafka Python 的生产者 APItransactions主题发送消息。

我很难不在这里复制粘贴一些代码。这不是 Kafka Python 客户端的教程,所以我只会一步一步地演示。如果你想了解更多细节,可以直接参考Kafka Python 的官方文档

KafkaProducer首先,让我们从 Kafka Python 库中获取它。

from kafka import KafkaProducer
Enter fullscreen mode Exit fullscreen mode

然后,我们从环境变量中获取代理的 URL(最佳实践仍然适用)。生产者使用该 URL 来引导其与 Kafka 集群的连接。

import os
# ...
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
Enter fullscreen mode Exit fullscreen mode

然后,我们可以实例化实际的生产者。它公开了一个简单的 API(详见文档,用于向 Kafka 主题发送消息。

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)
Enter fullscreen mode Exit fullscreen mode

下一步是——我们该如何处理生产者?嗯,我们可以用它来发送消息。发送什么消息呢?问得好!我们暂且跳过这个问题,假设我们发送空字符串。

现在,事务生成器的目标是模拟系统中无限流动的事务流。因此,我们将运行一个无限循环,不断向某个主题发送这些消息。

附注——我喜欢参考《如何粉刷自行车棚:Kafka 主题命名规范》中的建议来命名主题。因此,我将使用queueing.transactions`<transactions>` 作为事务主题名称,它实际上代表了一个待处理事务的队列。

以下是实现这种循环的方法:

from time import sleep
# ...
while True:
    message = ''  # TODO
    # Kafka messages are plain bytes => need to `.encode()` the string message
    producer.send('queueing.transactions', value=message.encode())
    sleep(1)  # Sleep for one second before producing the next transaction
Enter fullscreen mode Exit fullscreen mode

最终,我们得到的 Python 文件如下:

# generator/app.py
import os
from time import sleep
from kafka import KafkaProducer

KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')

if __name__ == '__main__':
    producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)
    while True:
        message = ''  # TODO
        producer.send('queueing.transactions', value=message.encode())
        sleep(1)
Enter fullscreen mode Exit fullscreen mode

现在,让我们把这个generator应用添加到我们的docker-compose.yml配置中:

# docker-compose.yml
services:
  # ...
  generator:
    build: ./generator
    environment:
      KAFKA_BROKER_URL: broker:9092
Enter fullscreen mode Exit fullscreen mode

在启动此应用程序之前,我们需要用代表交易的实际值填充消息

为此,我构建了一个实用create_random_transaction()函数,我们可以用它来创建随机交易。这些交易将包含一个source账户、一个target账户、一个amountcurrency

很简单,所以我直接把transactions.py文件复制粘贴过来:

# generator/transactions.py
from random import choices, randint
from string import ascii_letters, digits

account_chars: str = digits + ascii_letters

def _random_account_id() -> str:
    """Return a random account number made of 12 characters."""
    return ''.join(choices(account_chars, k=12))

def _random_amount() -> float:
    """Return a random amount between 1.00 and 1000.00."""
    return randint(100, 1000000) / 100

def create_random_transaction() -> dict:
    """Create a fake, randomised transaction."""
    return {
        'source': _random_account_id(),
        'target': _random_account_id(),
        'amount': _random_amount(),
        # Keep it simple: it's all euros
        'currency': 'EUR',
    }
Enter fullscreen mode Exit fullscreen mode

太好了!我们现在可以用一条虚假的交易信息替换这条空白消息:

from transactions import create_random_transaction

# ...
transaction: dict = create_random_transaction()
message: str = json.dumps(transaction)
producer.send('queueing.transactions', value=message.encode())
# ...
Enter fullscreen mode Exit fullscreen mode

让我们通过增强generator应用程序的可配置性来完成最后的完善工作——这始终是一个很好的做法。让我们:

  • 将该queueing.transactions主题提取到KAFKA_TRANSACTIONS_TOPIC环境变量中
  • 将睡眠时间设置为可通过TRANSACTIONS_PER_SECOND环境变量配置。
  • 使用KafkaProducer'svalue_serializer参数自动将字典中的消息序列化为 JSON 字节。

这是交易生成器的最终版本

# generator/app.py
import os
import json
from time import sleep
from kafka import KafkaProducer
from transactions import create_random_transaction

KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
TRANSACTIONS_PER_SECOND = float(os.environ.get('TRANSACTIONS_PER_SECOND'))
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND

if __name__ == '__main__':
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER_URL,
        # Encode all values as JSON
        value_serializer=lambda value: json.dumps(value).encode(),
    )
    while True:
        transaction: dict = create_random_transaction()
        producer.send(TRANSACTIONS_TOPIC, value=transaction)
        print(transaction)  # DEBUG
        sleep(SLEEP_TIME)
Enter fullscreen mode Exit fullscreen mode

以下是 Docker Compose 服务的样子:

# docker-compose.yml
services:
  generator:
      build: ./generator
      environment:
        KAFKA_BROKER_URL: broker:9092
        TRANSACTIONS_TOPIC: queueing.transactions
        TRANSACTIONS_PER_SECOND: 1000
Enter fullscreen mode Exit fullscreen mode

试用交易生成器

现在,最好是让程序运行起来,并检查一切是否正常。毕竟,如果你自己都无法验证,我又有什么资格告诉你它一定有效呢?

为此,我们首先要确保本地 Kafka 集群已启动,然后启动主 Docker Compose:

$ docker-compose up
Enter fullscreen mode Exit fullscreen mode

这将启动generator生产者。

我们的调试print程序会将 Python 中的交易输出到控制台,但这并不能保证消息能够有效地发送到主题。

为了验证这一点,我们可以使用 Confluent Platform 内置并包含在confluent/kafka-cpDocker 镜像中的脚本。该脚本名为 `confluent_topic_contents` kafka-console-consumer。它允许您读取主题的内容并将其打印到控制台。

该命令需要在容器内运行broker,因此有点晦涩难懂,但命令如下(您需要在单独的终端中运行它):

docker-compose -f docker-compose.kafka.yml exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic queueing.transactions --from-beginning
Enter fullscreen mode Exit fullscreen mode

--from-beginning标志表示命令将读取整个主题,而不仅仅是读取新消息。

您可以让它运行一段时间,然后按下Ctrl+C停止键kafka-console-consumer,查看已读消息总数:

...
{"source": "Jpe2zq1QDYTn", "target": "lLbLnl0FSr7T", "amount": 254.83, "currency": "EUR"}
{"source": "mfGWAdLKW8jh", "target": "ede8FQpjBeej", "amount": 785.22, "currency": "EUR"}
{"source": "GufdRdi1TElh", "target": "u0ANirFVu76B", "amount": 407.64, "currency": "EUR"}
^C
Processed a total of 4783 messages
Enter fullscreen mode Exit fullscreen mode

太棒了!它奏效了——虚假交易已成功生成。

您现在可以停止发电机了:Ctrl+C或者$ docker-compose down

那么,我们目前取得了哪些进展?

  • 本地 Kafka 集群已启动并运行🚀
  • 一个基于生产者的应用程序,它生成虚假交易并将其发布到 Kafka 主题📝

以下是架构图:

交易生成器:检查完毕!

我们现在只差一样东西了——那就是真正的欺诈检测器,它可以处理大量的交易并检测出可疑的交易!

构建欺诈检测器

欺诈检测器是流处理应用程序的典型示例

它以交易流作为输入,执行某种过滤,然后将结果输出到两个单独的流中——合法的交易流和可疑的交易流,这种操作也称为分支

检测器将交易流分成两个独立的流。✂️

除了向主题发布消息的生产者之外,还有另一种 Kafka 应用程序用于从主题中读取消息。它们被称为消费者。实际上,我们已经通过kafka-console-consumer上面的脚本使用了一个消费者!

消费者可以读取一个或多个主题中的消息。实际上,他们订阅这些主题,Kafka 会在消息发布时将其广播给他们。这是使用 Kafka 构建的流式应用程序响应式响应能力的核心特性。

我们又一次幸运了!Kafka Python 客户端允许我们用 Python 构建消费者。我们将使用Kafka Python 的 Consumer API来实现这一点。

不过,欺诈检测器并非普通的消费者,而是一个流式应用程序。因此,它使用消费者读取消息,然后对这些消息进行处理,并将处理后的消息输出到两个输出主题之一。所以我们需要一个消费者和一个生产者

首先,我们来编写消费者部分。我会一步一步地讲解,最后向您展示最终结果。

让我们先从获取KafkaConsumer

# detector/app.py
from kafka import KafkaConsumer
Enter fullscreen mode Exit fullscreen mode

让我们从环境变量中获取代理 URL 和交易主题:

import os
# ...
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
Enter fullscreen mode Exit fullscreen mode

现在我们可以实例化消费者了。我们知道事务主题包含 JSON 编码的值(由事务生成器生成),因此我们可以使用KafkaConsumer`s`将原始字节自动加载到 Python 字典中。这与我们之前使用`s`value_deserializer类似。KafkaProducervalue_serializer

import json
# ...
consumer = KafkaConsumer(
    TRANSACTIONS_TOPIC,
    bootstrap_servers=KAFKA_BROKER_URL,
    value_deserializer=lambda value: json.loads(value),
)
Enter fullscreen mode Exit fullscreen mode

现在我们可以通过遍历消费者来处理消息流:

for message in consumer:
    transaction: dict = message.value
    # TODO: determine if transaction is suspicious
    # TODO: output transaction in the correct topic (legit or fraud)
Enter fullscreen mode Exit fullscreen mode

现在有一个有趣的问题——我们如何确定一笔交易是否属于欺诈?

有很多选择,这很大程度上取决于业务用例。

在现实世界中,检测欺诈是一个难题,因为交易中有很多迹象表明存在欺诈行为。例如,如果某人在24小时内从50个不同的账户收到50笔金额均为1000欧元的交易,这很可能是可疑的。你甚至可以考虑添加人工智能层来进一步提高预测的准确性。

虽然可行,但用 Kafka 实现这类规则比我们这里能讨论的要复杂得多。我们这里只关注如何使用 Kafka 将流处理方法付诸实践

为了简化说明,我们得稍微“作弊”一下。假设我们面对的是一个虚构的银行系统,该系统规定单笔转账超过900欧元是违法的。因此,任何amount超过900欧元的交易都可能被视为欺诈。

以下是用于判断交易是否可疑的相应函数:

def is_suspicious(transaction: dict) -> bool:
    return transaction['amount'] >= 900
Enter fullscreen mode Exit fullscreen mode

猜猜看——我们现在有办法决定将交易重定向到哪个主题了!

现在我们准备实现检测器的核心部分。为了将消息生成到主题中,我们需要一个KafkaProducer……让我们实例化它——就像我们为交易生成器所做的那样。

from kafka import KafkaProducer
# ...
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER_URL,
    value_serializer=lambda value: json.dumps(value).encode(),
)
Enter fullscreen mode Exit fullscreen mode

现在我们可以利用is_suspicious()生产者来确定交易应该发送到哪个主题,然后将交易发送到该主题:

import os
# ...
LEGIT_TOPIC = os.environ.get('LEGIT_TOPIC')
FRAUD_TOPIC = os.environ.get('FRAUD_TOPIC')
# ...
for message in consumer:
    transaction: dict = message.value
    topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
    producer.send(topic, value=transaction)
Enter fullscreen mode Exit fullscreen mode

搞定了!我们已经把所有部件连接起来了。

在进行测试之前,这是欺诈检测器的最终版本

# detector/app.py
import os
import json
from kafka import KafkaConsumer, KafkaProducer

KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
LEGIT_TOPIC = os.environ.get('LEGIT_TOPIC')
FRAUD_TOPIC = os.environ.get('FRAUD_TOPIC')

def is_suspicious(transaction: dict) -> bool:
    """Determine whether a transaction is suspicious."""
    return transaction['amount'] >= 900

if __name__ == '__main__':
    consumer = KafkaConsumer(
        TRANSACTIONS_TOPIC,
        bootstrap_servers=KAFKA_BROKER_URL,
        value_deserializer=lambda value: json.loads(value),
    )
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER_URL,
        value_serializer=lambda value: json.dumps(value).encode(),
    )
    for message in consumer:
        transaction: dict = message.value
        topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
        producer.send(topic, value=transaction)
        print(topic, transaction)  # DEBUG
Enter fullscreen mode Exit fullscreen mode

以及我们配置中对应的服务docker-compose.yml

# docker-compose.yml
services:
  # ... generator here ...
  detector:
    build: ./detector
    environment:
      KAFKA_BROKER_URL: broker:9092
      TRANSACTIONS_TOPIC: queueing.transactions
      LEGIT_TOPIC: streaming.transactions.legit
      FRAUD_TOPIC: streaming.transactions.fraud
Enter fullscreen mode Exit fullscreen mode

试用欺诈检测器

让我们来试一试!和之前一样,我们先启动 Docker Compose,这次它会同时启动交易生成器和欺诈检测器:

$ docker-compose up
Enter fullscreen mode Exit fullscreen mode

就像我们在测试生成器时所做的那样,我们可以通过启动两个kafka-console-consumers(每个输出主题一个)来验证检测器是否正确地消费、处理和生成事务。

以下是执行此操作的命令:

docker-compose -f docker-compose.kafka.yml exec detector kafka-console-consumer --bootstrap-server localhost:9092 --topic streaming.transactions.legit
docker-compose -f docker-compose.kafka.yml exec detector kafka-console-consumer --bootstrap-server localhost:9092 --topic streaming.transactions.fraud
Enter fullscreen mode Exit fullscreen mode

让我们一起来了解一下这些主题的内容吧!

  • 其中streaming.transactions.legit包含合法交易(金额低于 900 欧元),正如预期:
...
{"source": "v16xO8Oj7s4u", "target": "QgvC9nMnBQMO", "amount": 231.67, "currency": "EUR"}
{"source": "wqduJdB5focL", "target": "sxKNYdlSQqQl", "amount": 229.57, "currency": "EUR"}
{"source": "VNWKUvsMOOWA", "target": "7wWynSlnuYpP", "amount": 752.4, "currency": "EUR"}
{"source": "bjt8hci7q8Wm", "target": "LobJtJJB3RDt", "amount": 410.42, "currency": "EUR"}
{"source": "E6NpRQcSJ3es", "target": "E9QeU7qfekKB", "amount": 659.23, "currency": "EUR"}
{"source": "IijVKIGtiGrc", "target": "eWVPvLGm8JLi", "amount": 846.07, "currency": "EUR"}
{"source": "yGfZ1Xa6k1r0", "target": "N5RvY7RO5sQF", "amount": 217.46, "currency": "EUR"}
...
^C
Processed a total of 13593 messages
Enter fullscreen mode Exit fullscreen mode
  • 唯一streaming.transactions.fraud包含可疑交易(金额超过 900 欧元)的记录也在意料之中!
...
{"source": "jwRMZyzGAsmG", "target": "i33smnlpnYtd", "amount": 980.05, "currency": "EUR"}
{"source": "WcmydQNdDxya", "target": "ykFPmUSysJuJ", "amount": 914.81, "currency": "EUR"}
{"source": "UuCOtfIqc4oU", "target": "PrD271ASM3mZ", "amount": 911.9, "currency": "EUR"}
{"source": "ZHdwSxcIi1gv", "target": "CNLyxBAHT2lW", "amount": 925.55, "currency": "EUR"}
{"source": "gQUtarjghjWx", "target": "ABKEhcJuYiEo", "amount": 945.14, "currency": "EUR"}
{"source": "NS2HA0HoMQPt", "target": "h989MZ4lcDqg", "amount": 901.25, "currency": "EUR"}
{"source": "0w22LoewVfYW", "target": "bUejK0tbzLT2", "amount": 955.0, "currency": "EUR"}
...
^C
Processed a total of 1519 messages
Enter fullscreen mode Exit fullscreen mode

如您所见,大约十分之一的消息被标记为欺诈交易。考虑到交易金额在 0 到 1000 欧元之间,触发阈值为 900 欧元,我们预计欺诈交易的比例为 10%,所以一切似乎运行正常!

退后一步

就这样——我们现在有了一个可用的欺诈检测流程

  • 一个本地 Kafka 集群,充当集中式流平台
  • 一个向集群生成交易的交易生成器。
  • 欺诈检测器,可处理交易、检测潜在的欺诈交易,并将结果输出到两个不同的主题中。

你可能想知道——接下来会发生什么?

这个系统有很多可以改进或扩展的地方。以下仅列举几个想法:

  • 实施更完善的欺诈检测算法
  • 将有关欺诈交易的警报或报告发送到某个地方:电子邮件、通知、Slack 机器人——任何方式都可以!

你可能还会问——部署?嗯,我故意略过了这部分。虽然在生产环境中部署一组 Kafka 应用应该与你通常的工作流程并无太大区别,但它可能需要遵循自身的最佳实践。另一方面,在生产环境中部署 Kafka 集群则完全是另一个话题(此处双关)。

虽然我们已经成功构建了一个流式系统,但我们甚至还没有触及Kafka 生态系统所能做到的冰山一角。

熟悉 Kafka 生态系统的人可能会想:“KSQL 呢?”、“Kafka Streams 呢?”、“Schema Registry 呢?”。嗯,是的,这些都有,我们以后可能会深入了解一下。

本教程旨在向大家展示Apache Kafka 的核心——主题、生产者和消费者,以及如何安排它们来构建现实世界的流式应用程序。

流媒体播放所有内容

如果你已经阅读并和我一起构建了这个欺诈检测系统——恭喜!你刚刚使用 Apache Kafka 和 Python 构建了你的第一个真正的流式应用程序

我们可以为我们取得的成就感到自豪。我们结合了 Apache Kafka、Python 和 Docker,解决了一个实际的业务问题。

这只是一个入门示例,但我希望你能开始想象像 Apache Kafka 这样的流式平台如何开启一个全新的系统和应用世界。想想看,如果用 REST API 和传统的 HTTP 调用来构建这个系统,该有多么困难。我猜:根本不可能。

如果你迫不及待地想知道接下来会发生什么,我有个好消息:我们才刚刚开始!👊

下一篇文章,我们将回归 Kafka 的核心概念,并尝试进行一次范式转变。Apache Kafka 的核心在于流式数据处理,因此我们需要从请求思维转变为事件思维

在此之前,你可以在GitHub上找到完整的代码,所以请继续体验欺诈检测器,并敬请期待更多 Kafka 相关内容!💻

下一篇:突发新闻:万物皆事件!(流媒体、卡夫卡与你)

保持联系!

如果你喜欢这篇文章,可以在推特上关注我,获取更新、公告和新闻。🐤

文章来源:https://dev.to/florimondmanca/building-a-streaming-fraud-detection-system-with-kafka-and-python-neg