使用 Kafka 和 Python 构建流式欺诈检测系统
这是我的“使用 Apache Kafka 构建流式应用程序”系列文章的第二篇。如果您错过了第一篇,可以先阅读第一篇,了解本系列文章的缘起以及您将会看到的内容。
这次,我们将亲自动手,使用 Python 客户端创建我们第一个基于 Apache Kafka 的流式应用程序。我会尽量让它接近真实的Kafka 应用程序。
我写这篇博客文章是为了做一个教程。你不需要了解任何关于流处理或 Apache Kafka 的知识——我会边讲解边解释相关的概念和术语——你可以跟着我们一起构建系统。
我们将构建一个实时欺诈检测系统。我们将从零开始构建。我们将生成交易流并处理这些交易,以检测哪些交易可能是欺诈行为。
听起来很刺激吧?希望如此!
您可以在GitHub上找到支持这篇博文的完整代码。
先决条件
我写这篇文章是为了方便大家跟着我的步骤一起开发各种应用程序。不过,需要先满足一些前提条件。
首先,我们需要在本地运行一个 Kafka 集群。这通常不会占用太多资源,但您可能需要一台性能不错的开发机器。
接下来,我们将使用Docker Compose运行集群,因为这将使我们尽可能接近生产系统——也就是真实世界的环境。所以请确保您已安装Docker和Docker Compose。
因此,我也希望您对 Docker 和 Docker Compose 有一定的了解。虽然我会解释更详细的内容,但您至少应该知道如何读取 .dockerDockerfile文件docker-compose.yml。
最后,因为我们将使用Python构建应用程序,所以最后一个先决条件是具备 Python 语言和生态系统的基本知识。
差不多就是这样了,让我们开始吧!
启动本地 Kafka 集群
Apache Kafka 是一款软件,与所有软件一样,它运行在实际的计算机上——就这篇博文而言,就是你自己的计算机。
首先,我们来了解一些术语。Kafka 是一个分布式系统,它运行在一个集群中,也就是多台计算机(也称为节点)相互通信。在 Kafka 术语中,节点被称为代理(broker)。你的计算机将是唯一的代理。
Kafka 运行需要什么?实际上,它只需要两样东西——Kafka broker 和Zookeeper实例。
“什么是动物园管理员?!”
别担心——你其实不需要知道它是什么。简单来说,它是一个分布式协调软件,Apache Kafka 使用它来跟踪集群状态和成员。它是任何 Kafka 集群的重要组成部分,但我们不会过多赘述。
在引言中,我提到我们将使用Docker Compose来运行集群。让我们看看效果如何。
Docker Compose 配置
docker-compose.yml我们将使用以下文件。它使用了来自Confluent Platform 的Docker镜像——Confluent是 Apache Kafka 社区的主要参与者——并受到了他们的Kafka 单节点示例的启发。
它有两个服务:一个用于 Kafka 代理,一个用于 Zookeeper 实例。
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
我会详细解释这里发生了什么——但如果你只想直接运行,也可以跳到下一部分。
这项zookeeper服务是——没错——为Zookeeper提供的。它使用 Confluent Platform 的 Docker 镜像cp-zookeeper。两个环境变量定义了 Zookeeper 的可访问端口(我们将把这个端口传递给代理)及其基本时间单位(一个必需的配置细节)。
其次,这项kafka服务就是我们的Kafka broker。在为数不多的环境变量中,我们需要关注的两个是 ` KAFKA_ZOOKEEPER_CONNECTZookeeper` 和 `connection`。`Zookeeper` 告诉 broker 它在哪里可以找到 Zookeeper,而KAFKA_ADVERTISED_LISTENERS`connection` 定义了我们可以从其他应用程序连接到 broker 的位置。您无需关心副本因子;我只需告诉您,它设置为 1,因为集群中只有一个 broker。
现在我们已经定义了集群的 Docker Compose 配置,接下来我们……
启动集群!
现在是时候启动本地 Kafka 集群了。怎么做呢?如果您了解 Docker Compose,那就很简单。只需进入项目根目录(Kafka 文件所在的docker-compose.yml目录)并运行以下命令:
$ docker-compose up
你会看到控制台里出现一大堆日志——别担心!这是正常的。Kafka 和 Zookeeper 在启动时都会输出很多信息。要知道它们何时初始化完成,你可以在另一个终端运行以下命令。它会等待服务broker输出“已启动”消息。
$ docker-compose logs -f broker | grep started
broker_1 | INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
如果成功了,恭喜!你现在已经搭建并运行了一个单节点 Kafka 集群。不过它本身可能用处不大。不如我们开始传输一些数据吧?
进入:欺诈检测系统
没错——现在正是我们大展身手的时候!🎉
我本来可以详细讲解如何制作一个“Hello, World!”流式应用程序,在这个应用程序中,用户可以在一端(生产者)输入消息,然后在另一端(消费者)实时接收这些消息。但是,Confluent 的 Docker 快速入门指南和其他地方已经有完全相同的教程了。
相反,我希望我们构建一个能够解决实际业务用例的真正系统。在控制文章篇幅的同时做到这一点并不容易,但我们会看看结果如何。
我们将构建的系统是一个简单的欺诈检测机制——类似于您在银行和交易监控系统中可能遇到的那种机制。
我们将在一端生成虚假交易,并在另一端过滤并记录那些可疑的交易。这需要一个交易生成器(因为我们需要数据进行处理)和一个欺诈检测器。这两个应用程序都将在 Docker 容器中运行,并与我们的 Kafka 集群交互。
既然我们已经了解了这个系统的基本思路,那就让我们开始编写一些代码吧?
创建应用框架
我们将首先创建交易生成器和欺诈检测器的容器化框架。它们都是简单的 Python 应用程序,因此我们将Dockerfile在两种情况下使用相同的代码:
# Dockerfile
FROM python:3.6
WORKDIR /usr/app
ADD ./requirements.txt ./
RUN pip install -r requirements.txt
ADD ./ ./
CMD ["python", "app.py"]
我们将创建两个文件夹来存放这些内容Dockerfile:一个用于交易生成器,一个用于欺诈检测器。
我们还会引入一个 Python 依赖项——Kafka Python,它是 Kafka(以及其他服务)的 Python 客户端。我们将使用它来构建与 Kafka 集群交互的应用程序。
# requirements.txt
kafka-python
实际的 Python 代码将位于app.py每个应用程序的文件中。因此,最终我们会得到如下的文件夹结构:
.
├── docker-compose.yml
├── detector
│ ├── Dockerfile
│ ├── app.py
│ └── requirements.txt
└── generator
├── Dockerfile
├── app.py
└── requirements.txt
稍后我们将把这些服务集成到开发环境中。但在此之前,让我先带您了解如何将 Kafka 放入独立的 Docker Compose 文件中,这样我们就可以像使用实际生产集群一样使用它。
隔离 Kafka 集群
在生产环境中,Kafka 始终以服务的形式运行。这意味着 Kafka 集群独立于使用它的应用程序运行。后者只需连接到集群即可发布或读取事件。
我们正在尝试构建一个真实世界的系统,那么我们何不让我们的应用程序也与一个独立的、类似生产环境的 Kafka 集群进行交互呢?
为了在本地实现这一点,我们将把我们的服务迁移zookeeper到broker一个单独的 Docker Compose 配置中。我们把它放在一个docker-compose.kafka.yml文件中。稍后,我们将docker-compose.yml仅对应用程序服务使用常规配置。
# docker-compose.kafka.yml
version: '3'
services:
# ... zookeeper and broker as before ...
你可能觉得这样就足够了,但其实还缺少一些东西。为了让两个 Docker Compose 文件都能访问同一个网络(记住,默认情况下每个 Docker Compose 文件都使用自己的私有网络),我们需要使用一个外部 Docker 网络。我们把它命名为 ` <network_name> kafka-network`。现在就来创建这个网络吧:
$ docker network create kafka-network
85351a30f253b0fb8ae197ea07ddeddd0412c8a789e910dd4a3f424057122d30
现在我们可以添加networks以下部分docker-compose.kafka.yml:
# docker-compose.kafka.yml
version: '3'
services:
# ... zookeeper and broker as before ...
# NEW
networks:
default:
external:
name: kafka-network
(如果您有兴趣,可以阅读更多关于Docker Compose 网络方面的内容)。
太好了,我们现在已经在单独的 Docker Compose 配置中部署了 Kafka 集群。那又怎样呢?
这意味着我们可以将集群视为一个完全隔离的系统。您可以独立于其他 Docker 应用程序启动或关闭它——就像在现实世界中一样。
实际上,要修改这个 Docker Compose 文件,我们需要使用-f标志来指定我们要使用的配置。
例如,要启动 Kafka 集群,您现在将使用:
docker-compose -f docker-compose.kafka.yml up
如果您想访问日志:
docker-compose -f docker-compose.kafka.yml logs broker
你明白我的意思了。
现在,我们可以创建一个docker-compose.yml具有相同网络配置的空实例,稍后我们将向其中添加交易生成器和欺诈检测服务:
# docker-compose.yml
version: '3'
services:
# TODO: generator and detector coming soon
# Give this composition access to the Kafka network
networks:
default:
external:
name: kafka-network
转移话题就到此为止——我想我们一切就绪!让我们开始构建流媒体欺诈检测系统吧。
构建交易生成器
要进行流处理,首先需要一个数据流。这时就需要用到事务生成器了。我们将使用它来创建一个事务流。
在 Kafka 中,您可以构建几种不同类型的应用程序。其中一类应用程序是生产者。
生产者执行我们所需的操作——它们向 Kafka 集群发布消息。这些消息将存储在我们代理服务器上的一个称为“主题”的存储位置——主题是磁盘上用于存储消息的命名存储。
现在,我有个好消息。我们使用的 Python 客户端(Kafka Python)允许我们构建生产者。🎉
所以让我们使用Kafka Python 的生产者 API向transactions主题发送消息。
我很难不在这里复制粘贴一些代码。这不是 Kafka Python 客户端的教程,所以我只会一步一步地演示。如果你想了解更多细节,可以直接参考Kafka Python 的官方文档。
KafkaProducer首先,让我们从 Kafka Python 库中获取它。
from kafka import KafkaProducer
然后,我们从环境变量中获取代理的 URL(最佳实践仍然适用)。生产者使用该 URL 来引导其与 Kafka 集群的连接。
import os
# ...
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
然后,我们可以实例化实际的生产者。它公开了一个简单的 API(详见文档),用于向 Kafka 主题发送消息。
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)
下一步是——我们该如何处理生产者?嗯,我们可以用它来发送消息。发送什么消息呢?问得好!我们暂且跳过这个问题,假设我们发送空字符串。
现在,事务生成器的目标是模拟系统中无限流动的事务流。因此,我们将运行一个无限循环,不断向某个主题发送这些消息。
附注——我喜欢参考《如何粉刷自行车棚:Kafka 主题命名规范》中的建议来命名主题。因此,我将使用queueing.transactions`<transactions>` 作为事务主题名称,它实际上代表了一个待处理事务的队列。
以下是实现这种循环的方法:
from time import sleep
# ...
while True:
message = '' # TODO
# Kafka messages are plain bytes => need to `.encode()` the string message
producer.send('queueing.transactions', value=message.encode())
sleep(1) # Sleep for one second before producing the next transaction
最终,我们得到的 Python 文件如下:
# generator/app.py
import os
from time import sleep
from kafka import KafkaProducer
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
if __name__ == '__main__':
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL)
while True:
message = '' # TODO
producer.send('queueing.transactions', value=message.encode())
sleep(1)
现在,让我们把这个generator应用添加到我们的docker-compose.yml配置中:
# docker-compose.yml
services:
# ...
generator:
build: ./generator
environment:
KAFKA_BROKER_URL: broker:9092
在启动此应用程序之前,我们需要用代表交易的实际值填充消息。
为此,我构建了一个实用create_random_transaction()函数,我们可以用它来创建随机交易。这些交易将包含一个source账户、一个target账户、一个amount和currency。
很简单,所以我直接把transactions.py文件复制粘贴过来:
# generator/transactions.py
from random import choices, randint
from string import ascii_letters, digits
account_chars: str = digits + ascii_letters
def _random_account_id() -> str:
"""Return a random account number made of 12 characters."""
return ''.join(choices(account_chars, k=12))
def _random_amount() -> float:
"""Return a random amount between 1.00 and 1000.00."""
return randint(100, 1000000) / 100
def create_random_transaction() -> dict:
"""Create a fake, randomised transaction."""
return {
'source': _random_account_id(),
'target': _random_account_id(),
'amount': _random_amount(),
# Keep it simple: it's all euros
'currency': 'EUR',
}
太好了!我们现在可以用一条虚假的交易信息替换这条空白消息:
from transactions import create_random_transaction
# ...
transaction: dict = create_random_transaction()
message: str = json.dumps(transaction)
producer.send('queueing.transactions', value=message.encode())
# ...
让我们通过增强generator应用程序的可配置性来完成最后的完善工作——这始终是一个很好的做法。让我们:
- 将该
queueing.transactions主题提取到KAFKA_TRANSACTIONS_TOPIC环境变量中 - 将睡眠时间设置为可通过
TRANSACTIONS_PER_SECOND环境变量配置。 - 使用
KafkaProducer'svalue_serializer参数自动将字典中的消息序列化为 JSON 字节。
这是交易生成器的最终版本:
# generator/app.py
import os
import json
from time import sleep
from kafka import KafkaProducer
from transactions import create_random_transaction
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
TRANSACTIONS_PER_SECOND = float(os.environ.get('TRANSACTIONS_PER_SECOND'))
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND
if __name__ == '__main__':
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER_URL,
# Encode all values as JSON
value_serializer=lambda value: json.dumps(value).encode(),
)
while True:
transaction: dict = create_random_transaction()
producer.send(TRANSACTIONS_TOPIC, value=transaction)
print(transaction) # DEBUG
sleep(SLEEP_TIME)
以下是 Docker Compose 服务的样子:
# docker-compose.yml
services:
generator:
build: ./generator
environment:
KAFKA_BROKER_URL: broker:9092
TRANSACTIONS_TOPIC: queueing.transactions
TRANSACTIONS_PER_SECOND: 1000
试用交易生成器
现在,最好是让程序运行起来,并检查一切是否正常。毕竟,如果你自己都无法验证,我又有什么资格告诉你它一定有效呢?
为此,我们首先要确保本地 Kafka 集群已启动,然后启动主 Docker Compose:
$ docker-compose up
这将启动generator生产者。
我们的调试print程序会将 Python 中的交易输出到控制台,但这并不能保证消息能够有效地发送到主题。
为了验证这一点,我们可以使用 Confluent Platform 内置并包含在confluent/kafka-cpDocker 镜像中的脚本。该脚本名为 `confluent_topic_contents` kafka-console-consumer。它允许您读取主题的内容并将其打印到控制台。
该命令需要在容器内运行broker,因此有点晦涩难懂,但命令如下(您需要在单独的终端中运行它):
docker-compose -f docker-compose.kafka.yml exec broker kafka-console-consumer --bootstrap-server localhost:9092 --topic queueing.transactions --from-beginning
该--from-beginning标志表示命令将读取整个主题,而不仅仅是读取新消息。
您可以让它运行一段时间,然后按下Ctrl+C停止键kafka-console-consumer,查看已读消息总数:
...
{"source": "Jpe2zq1QDYTn", "target": "lLbLnl0FSr7T", "amount": 254.83, "currency": "EUR"}
{"source": "mfGWAdLKW8jh", "target": "ede8FQpjBeej", "amount": 785.22, "currency": "EUR"}
{"source": "GufdRdi1TElh", "target": "u0ANirFVu76B", "amount": 407.64, "currency": "EUR"}
^C
Processed a total of 4783 messages
太棒了!它奏效了——虚假交易已成功生成。
您现在可以停止发电机了:Ctrl+C或者$ docker-compose down。
那么,我们目前取得了哪些进展?
- 本地 Kafka 集群已启动并运行🚀
- 一个基于生产者的应用程序,它生成虚假交易并将其发布到 Kafka 主题📝
以下是架构图:
我们现在只差一样东西了——那就是真正的欺诈检测器,它可以处理大量的交易并检测出可疑的交易!
构建欺诈检测器
欺诈检测器是流处理应用程序的典型示例。
它以交易流作为输入,执行某种过滤,然后将结果输出到两个单独的流中——合法的交易流和可疑的交易流,这种操作也称为分支。
除了向主题发布消息的生产者之外,还有另一种 Kafka 应用程序用于从主题中读取消息。它们被称为消费者。实际上,我们已经通过kafka-console-consumer上面的脚本使用了一个消费者!
消费者可以读取一个或多个主题中的消息。实际上,他们订阅这些主题,Kafka 会在消息发布时将其广播给他们。这是使用 Kafka 构建的流式应用程序响应式响应能力的核心特性。
我们又一次幸运了!Kafka Python 客户端允许我们用 Python 构建消费者。我们将使用Kafka Python 的 Consumer API来实现这一点。
不过,欺诈检测器并非普通的消费者,而是一个流式应用程序。因此,它使用消费者读取消息,然后对这些消息进行处理,并将处理后的消息输出到两个输出主题之一。所以我们需要一个消费者和一个生产者。
首先,我们来编写消费者部分。我会一步一步地讲解,最后向您展示最终结果。
让我们先从获取KafkaConsumer:
# detector/app.py
from kafka import KafkaConsumer
让我们从环境变量中获取代理 URL 和交易主题:
import os
# ...
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
现在我们可以实例化消费者了。我们知道事务主题包含 JSON 编码的值(由事务生成器生成),因此我们可以使用KafkaConsumer`s`将原始字节自动加载到 Python 字典中。这与我们之前使用`s`value_deserializer类似。KafkaProducervalue_serializer
import json
# ...
consumer = KafkaConsumer(
TRANSACTIONS_TOPIC,
bootstrap_servers=KAFKA_BROKER_URL,
value_deserializer=lambda value: json.loads(value),
)
现在我们可以通过遍历消费者来处理消息流:
for message in consumer:
transaction: dict = message.value
# TODO: determine if transaction is suspicious
# TODO: output transaction in the correct topic (legit or fraud)
现在有一个有趣的问题——我们如何确定一笔交易是否属于欺诈?
有很多选择,这很大程度上取决于业务用例。
在现实世界中,检测欺诈是一个难题,因为交易中有很多迹象表明存在欺诈行为。例如,如果某人在24小时内从50个不同的账户收到50笔金额均为1000欧元的交易,这很可能是可疑的。你甚至可以考虑添加人工智能层来进一步提高预测的准确性。
虽然可行,但用 Kafka 实现这类规则比我们这里能讨论的要复杂得多。我们这里只关注如何使用 Kafka 将流处理方法付诸实践。
为了简化说明,我们得稍微“作弊”一下。假设我们面对的是一个虚构的银行系统,该系统规定单笔转账超过900欧元是违法的。因此,任何amount超过900欧元的交易都可能被视为欺诈。
以下是用于判断交易是否可疑的相应函数:
def is_suspicious(transaction: dict) -> bool:
return transaction['amount'] >= 900
猜猜看——我们现在有办法决定将交易重定向到哪个主题了!
现在我们准备实现检测器的核心部分。为了将消息生成到主题中,我们需要一个KafkaProducer……让我们实例化它——就像我们为交易生成器所做的那样。
from kafka import KafkaProducer
# ...
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER_URL,
value_serializer=lambda value: json.dumps(value).encode(),
)
现在我们可以利用is_suspicious()生产者来确定交易应该发送到哪个主题,然后将交易发送到该主题:
import os
# ...
LEGIT_TOPIC = os.environ.get('LEGIT_TOPIC')
FRAUD_TOPIC = os.environ.get('FRAUD_TOPIC')
# ...
for message in consumer:
transaction: dict = message.value
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
producer.send(topic, value=transaction)
搞定了!我们已经把所有部件连接起来了。
在进行测试之前,这是欺诈检测器的最终版本:
# detector/app.py
import os
import json
from kafka import KafkaConsumer, KafkaProducer
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL')
TRANSACTIONS_TOPIC = os.environ.get('TRANSACTIONS_TOPIC')
LEGIT_TOPIC = os.environ.get('LEGIT_TOPIC')
FRAUD_TOPIC = os.environ.get('FRAUD_TOPIC')
def is_suspicious(transaction: dict) -> bool:
"""Determine whether a transaction is suspicious."""
return transaction['amount'] >= 900
if __name__ == '__main__':
consumer = KafkaConsumer(
TRANSACTIONS_TOPIC,
bootstrap_servers=KAFKA_BROKER_URL,
value_deserializer=lambda value: json.loads(value),
)
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER_URL,
value_serializer=lambda value: json.dumps(value).encode(),
)
for message in consumer:
transaction: dict = message.value
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
producer.send(topic, value=transaction)
print(topic, transaction) # DEBUG
以及我们配置中对应的服务docker-compose.yml:
# docker-compose.yml
services:
# ... generator here ...
detector:
build: ./detector
environment:
KAFKA_BROKER_URL: broker:9092
TRANSACTIONS_TOPIC: queueing.transactions
LEGIT_TOPIC: streaming.transactions.legit
FRAUD_TOPIC: streaming.transactions.fraud
试用欺诈检测器
让我们来试一试!和之前一样,我们先启动 Docker Compose,这次它会同时启动交易生成器和欺诈检测器:
$ docker-compose up
就像我们在测试生成器时所做的那样,我们可以通过启动两个kafka-console-consumers(每个输出主题一个)来验证检测器是否正确地消费、处理和生成事务。
以下是执行此操作的命令:
docker-compose -f docker-compose.kafka.yml exec detector kafka-console-consumer --bootstrap-server localhost:9092 --topic streaming.transactions.legit
docker-compose -f docker-compose.kafka.yml exec detector kafka-console-consumer --bootstrap-server localhost:9092 --topic streaming.transactions.fraud
让我们一起来了解一下这些主题的内容吧!
- 其中
streaming.transactions.legit包含合法交易(金额低于 900 欧元),正如预期:
...
{"source": "v16xO8Oj7s4u", "target": "QgvC9nMnBQMO", "amount": 231.67, "currency": "EUR"}
{"source": "wqduJdB5focL", "target": "sxKNYdlSQqQl", "amount": 229.57, "currency": "EUR"}
{"source": "VNWKUvsMOOWA", "target": "7wWynSlnuYpP", "amount": 752.4, "currency": "EUR"}
{"source": "bjt8hci7q8Wm", "target": "LobJtJJB3RDt", "amount": 410.42, "currency": "EUR"}
{"source": "E6NpRQcSJ3es", "target": "E9QeU7qfekKB", "amount": 659.23, "currency": "EUR"}
{"source": "IijVKIGtiGrc", "target": "eWVPvLGm8JLi", "amount": 846.07, "currency": "EUR"}
{"source": "yGfZ1Xa6k1r0", "target": "N5RvY7RO5sQF", "amount": 217.46, "currency": "EUR"}
...
^C
Processed a total of 13593 messages
- 唯一
streaming.transactions.fraud包含可疑交易(金额超过 900 欧元)的记录也在意料之中!
...
{"source": "jwRMZyzGAsmG", "target": "i33smnlpnYtd", "amount": 980.05, "currency": "EUR"}
{"source": "WcmydQNdDxya", "target": "ykFPmUSysJuJ", "amount": 914.81, "currency": "EUR"}
{"source": "UuCOtfIqc4oU", "target": "PrD271ASM3mZ", "amount": 911.9, "currency": "EUR"}
{"source": "ZHdwSxcIi1gv", "target": "CNLyxBAHT2lW", "amount": 925.55, "currency": "EUR"}
{"source": "gQUtarjghjWx", "target": "ABKEhcJuYiEo", "amount": 945.14, "currency": "EUR"}
{"source": "NS2HA0HoMQPt", "target": "h989MZ4lcDqg", "amount": 901.25, "currency": "EUR"}
{"source": "0w22LoewVfYW", "target": "bUejK0tbzLT2", "amount": 955.0, "currency": "EUR"}
...
^C
Processed a total of 1519 messages
如您所见,大约十分之一的消息被标记为欺诈交易。考虑到交易金额在 0 到 1000 欧元之间,触发阈值为 900 欧元,我们预计欺诈交易的比例为 10%,所以一切似乎运行正常!
退后一步
就这样——我们现在有了一个可用的欺诈检测流程:
- 一个本地 Kafka 集群,充当集中式流平台。
- 一个向集群生成交易的交易生成器。
- 欺诈检测器,可处理交易、检测潜在的欺诈交易,并将结果输出到两个不同的主题中。
你可能想知道——接下来会发生什么?
这个系统有很多可以改进或扩展的地方。以下仅列举几个想法:
- 实施更完善的欺诈检测算法
- 将有关欺诈交易的警报或报告发送到某个地方:电子邮件、通知、Slack 机器人——任何方式都可以!
你可能还会问——部署呢?嗯,我故意略过了这部分。虽然在生产环境中部署一组 Kafka 应用应该与你通常的工作流程并无太大区别,但它可能需要遵循自身的最佳实践。另一方面,在生产环境中部署 Kafka 集群则完全是另一个话题(此处双关)。
虽然我们已经成功构建了一个流式系统,但我们甚至还没有触及Kafka 生态系统所能做到的冰山一角。
熟悉 Kafka 生态系统的人可能会想:“KSQL 呢?”、“Kafka Streams 呢?”、“Schema Registry 呢?”。嗯,是的,这些都有,我们以后可能会深入了解一下。
本教程旨在向大家展示Apache Kafka 的核心——主题、生产者和消费者,以及如何安排它们来构建现实世界的流式应用程序。
流媒体播放所有内容
如果你已经阅读并和我一起构建了这个欺诈检测系统——恭喜!你刚刚使用 Apache Kafka 和 Python 构建了你的第一个真正的流式应用程序。
我们可以为我们取得的成就感到自豪。我们结合了 Apache Kafka、Python 和 Docker,解决了一个实际的业务问题。
这只是一个入门示例,但我希望你能开始想象像 Apache Kafka 这样的流式平台如何开启一个全新的系统和应用世界。想想看,如果用 REST API 和传统的 HTTP 调用来构建这个系统,该有多么困难。我猜:根本不可能。
如果你迫不及待地想知道接下来会发生什么,我有个好消息:我们才刚刚开始!👊
下一篇文章,我们将回归 Kafka 的核心概念,并尝试进行一次范式转变。Apache Kafka 的核心在于流式数据处理,因此我们需要从请求思维转变为事件思维。
在此之前,你可以在GitHub上找到完整的代码,所以请继续体验欺诈检测器,并敬请期待更多 Kafka 相关内容!💻
保持联系!
如果你喜欢这篇文章,可以在推特上关注我,获取更新、公告和新闻。🐤
文章来源:https://dev.to/florimondmanca/building-a-streaming-fraud-detection-system-with-kafka-and-python-neg



