发布于 2026-01-06 4 阅读
0

如何使用 Python 创建高性能 API 网关 DEV 的全球展示挑战赛,由 Mux 呈现:展示你的项目!

如何使用 Python 创建高性能 API 网关

由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!

如何使用 Python 创建高性能 API 网关

如何使用 Python 创建高性能 API 网关

在当今世界,API 已成为现代应用程序不可或缺的一部分。它们允许不同的服务相互通信和交换数据,从而构建复杂的系统。然而,管理 API 请求可能极具挑战性,尤其是在处理多个 API 服务器时。本文将探讨如何构建一个网关服务器,该服务器能够处理 API 请求并将其分发到不同的服务器。

介绍

网关服务器是一个集中式服务器,它接收来自客户端的请求,并将这些请求分发给不同的 API 服务器。网关服务器充当客户端和 API 服务器之间的中介,负责处理身份验证、速率限制和响应缓存。使用网关服务器的优势在于,它简化了 API 请求的管理,并为客户端提供了一个统一的入口点。

本文将重点介绍如何使用 Python 和 FastAPI 构建网关服务器。我们将使用 asyncio 处理异步请求,并使用 Redis 进行缓存。

构建网关服务器

为了构建我们的网关服务器,我们将使用 FastAPI,这是一个用于使用 Python 构建 API 的现代 Web 框架。我们将定义一个接收 API 请求并将其分发到不同 API 服务器的端点。我们还将实现速率限制、身份验证和响应缓存。

我们的终点定义如下:


@app.get("/api/v1/{path:path}", include_in_schema=False)
@auth_and_rate_limit
async def v1_gateway(request: Request, path: str):
    """
    NOTE: In order for the gateway server to work properly it needs at least 2 GIG or RAM
        master router
    :param request:
    :param path:
    :return:
    """
Enter fullscreen mode Exit fullscreen mode

此端点接收带有路径参数的请求,该参数是客户端想要访问的 API 端点的 URL。我们使用 `auth_and_rate_limit` 装饰器来强制执行速率限制和身份验证。

接下来,我们检查请求中是否包含 API 密钥。如果包含,则将其传递给 create_take_credit_args 函数,该函数负责处理 API 密钥的身份验证。

api_key: dict = request.query_params.get('api_key')
_path = f"/api/v1/{path}"
await create_take_credit_args(api_key=api_key, path=_path)

Enter fullscreen mode Exit fullscreen mode

身份验证完成后,我们会创建一个 API 服务器列表,并将请求分发到这些服务器。

api_urls = [f'{api_server_url}/api/v1/{path}' for api_server_url in api_server_urls]

Enter fullscreen mode Exit fullscreen mode

然后,我们检查请求的响应是否已缓存在 Redis 中。如果已缓存,则返回缓存的响应。否则,我们将请求发送到所有 API 服务器并等待它们的响应。

tasks = [redis_cache.get(key=api_url, timeout=60*5) for api_url in api_urls]
cached_responses = await asyncio.gather(*tasks)

for i, response in enumerate(cached_responses):
    if response is not None:
        app_logger.info(msg=f"Found cached response from {api_urls[i]}")
        return JSONResponse(content=response, status_code=200, headers={"Content-Type": "application/json"})

try:

    # 5 minutes timeout on resource fetching from backend - some resources may take very long
    tasks = [requester(api_url=api_url, timeout=300) for api_url in api_urls]
    responses = await asyncio.gather(*tasks)

except asyncio.CancelledError:
    responses = []
except httpx.HTTPError as http_err:
    responses = []
Enter fullscreen mode Exit fullscreen mode

如果收到来自任何 API 服务器的响应,我们会将其缓存到 Redis 中并返回给客户端。


    app_logger.info(msg=f"Request Responses returned : {len(responses)}")
    for i, response in enumerate(responses):
        if response and response.get("status", False):
            api_url = api_urls[i]
            # NOTE, Cache is being set to a ttl of one hour here
            await redis_cache.set(key=api_url, value=response, ttl=60 * 60)
            app_logger.info(msg=f"Server Responded for this Resource {api_url}")
            return JSONResponse(content=response, status_code=200, headers={"Content-Type": "application/json"})
        else:
            # The reason for this algorithm is because sometimes the cron server is busy this way no matter
            # what happens a response is returned
            app_logger.warning(msg=f"""
            Server Failed To Respond - Or Data Not Found
                Original Request URL : {api_urls[i]}
                Actual Response : {response}          
            """)

    mess = "All API Servers failed to respond - Or there is no Data for the requested resource and parameters"
    app_logger.warning(msg=mess)
    # TODO - send Notifications to developers that the API Servers are down - or something requests coming up empty handed
    _time = datetime.datetime.now().isoformat(sep="-")

    # TODO - create Dev Message Types - Like Fatal Errors, and etc also create Priority Levels
    _args = dict(message_type="resource_not_found", request=request, api_key=api_key)
    await email_process.send_message_to_devs(**_args)
    return JSONResponse(content={"status": False, "message": mess}, status_code=404,
                        headers={"Content-Type": "application/json"})


Enter fullscreen mode Exit fullscreen mode

如果后端服务器未响应,该函数会记录一条警告消息,指出所有 API 服务器均未响应,或者请求的资源和参数没有数据。然后,它会使用 `email_process.send_message_to_devs` 函数向开发人员发送通知,并返回一个状态码为 404、`Content-Type` 标头为 `application/json` 的 JSON 响应。该响应包含一个值为 False 的 `status` 字段和一个包含错误消息的 `message` 字段。

if response and response.get("status", False):

Enter fullscreen mode Exit fullscreen mode

如果从后端服务器收到响应,且该响应包含设置为 True 的“status”字段,则使用“redis_cache.set”函数缓存该响应,并立即返回状态码为 200 且“Content-Type”标头为“application/json”的响应。否则,该函数会记录一条警告消息,指出服务器未响应或未找到请求的数据。

*结论 *

总之,v1 网关 API 端点提供了一种强大而可靠的方式,可以从多个 API 服务器检索数据。

它利用缓存来减轻 API 服务器的负载,并提高频繁请求资源的响应速度。

它还实现了速率限制和身份验证,以确保系统的安全性和稳定性。

如果所有 API 服务器均无响应或未返回任何数据,系统将向开发人员发送通知,并向客户端返回相应的错误消息。

总而言之,这个 API 端点是设计和实现适用于现代 Web 应用程序的可扩展、高弹性 API 网关的一个很好的例子。

上述网关已实现为 EOD 股票 API 的网关,可在此处找到。

如果您正在寻找以下解决方案,它可为您的网站和应用程序提供可扩展的解决方案:

  1. 交流信息
  2. 股票代码数据
  3. 每日收盘(EOD)股票数据
  4. 基本数据
  5. 股票期权和拆分数据
  6. 金融新闻API
  7. 社交媒体趋势数据与股票市场
  8. 新闻与社交媒体的情感分析
文章来源:https://dev.to/freelancingsolutions/how-to-create-a-high-performant-api-gateway-using-python-4059