发布于 2026-01-06 4 阅读
0

通过 S3 Select 高效流式传输大型 AWS S3 文件 AWS S3 Select 演示背景

通过 S3 Select 高效流式传输大型 AWS S3 文件

AWS S3 选择演示

背景

AWS S3 是业界领先的对象存储服务。我们通常会在 S3 上存储大量数据文件,有时需要处理这些文件。如果待处理文件较小,我们可以采用传统的文件处理流程,即从 S3 中获取文件,然后逐行处理。但问题是,如果文件较大呢?> 1GB😓

导入(读取)大文件会导致Out of Memory错误,甚至可能导致系统崩溃。虽然有一些库(例如PandasDask等)非常擅长处理大文件,但前提是文件必须存在于本地,也就是说我们需要将其从 S3 导入到本地计算机。但如果我们不想将整个 S3 文件获取并存储到本地呢?🤔

📜让我们来看一些使用场景:

  • 我们需要每天处理一个大型 CSV S3 文件(约 2GB)。必须在规定的时间范围内(例如 4 小时内)完成处理。
  • 我们需要定期从FTP服务器处理大型S3文件。新文件会按一定的时间间隔到达,并且需要按顺序处理,即必须先处理旧文件,才能开始处理新文件。

这些都是本地处理可能会影响系统整体流程的典型场景。此外,如果我们在容器中运行这些文件处理单元,那么可用的磁盘空间就会受到限制。因此,我们需要一个云流式处理流程(它还可以parallelize the processing of multiple chunks通过在并行线程/进程中流式传输同一文件的不同块来处理同一文件)。我就是在这里发现了这个AWS S3 Select功能。😎

📝 本文重点介绍如何将大文件流式传输成更小的、易于管理的块(顺序传输)。这种方法可以用于并行处理,通过在并发线程/进程中运行来实现。请关注我的下一篇文章,了解更多详情


S3 选择

借助此功能Amazon S3 Select,您可以使用简单的结构化查询语言 (SQL) 语句来筛选 Amazon S3 对象的内容,并仅检索所需的数据子集。使用Amazon S3 Select此功能筛选数据,可以减少 Amazon S3 传输的数据量,从而降低检索数据的成本和延迟。

Amazon S3 Select 可处理以 CSV、JSON 或 Apache Parquet 格式存储的对象。它还支持使用 GZIP 或 BZIP2 压缩的对象(仅限 CSV 和 JSON 对象)以及服务器端加密的对象。您可以指定结果格式为 CSV 或 JSON,并可定义结果中记录的分隔符。

📝 我们将使用 Pythonboto3来实现我们的最终目标。

🧱 构建 SQL 表达式

为了配合使用S3 Selectboto3它提供了`select_object_content()`函数来查询 S3。您需要在请求中将 SQL 表达式传递给 Amazon S3。Amazon S3 Select它支持部分 SQL。请查看此链接了解更多信息



response = s3_client.select_object_content(
    Bucket=bucket,
    Key=key,
    ExpressionType='SQL',
    Expression='SELECT * FROM S3Object',
    InputSerialization={
        'CSV': {
            'FileHeaderInfo': 'USE',
            'FieldDelimiter': ',',
            'RecordDelimiter': '\n'
        }
    },
    OutputSerialization={
        'JSON': {
            'RecordDelimiter': ','
        }
    }
)


Enter fullscreen mode Exit fullscreen mode

在上述请求中,InputSerialization确定 S3 文件类型和相关属性,同时OutputSerialization确定response我们从中获得的结果select_object_content()

🌫️ 流媒体片段

现在,我们已经对它的工作原理有了一些了解S3 Select,让我们尝试实现我们的用例,即以流式传输的方式传输一个大文件的部分(子集),就像这样paginated API works。😋

S3 Select支持ScanRange通过指定要查询的字节范围来流式传输对象子集的参数。S3 Select请求一系列不重叠的扫描范围。扫描范围无需与记录边界对齐。如果记录的起始位置在指定的扫描范围内,但超出扫描范围,查询仍会处理该记录。这意味着查询会在扫描范围内获取该行,并可能扩展到获取整行。最终doesn't fetch a subset of a row,要么获取整行,要么跳过该行(以便在另一个扫描范围内获取)。

让我们尝试用两个简单的步骤来实现这个目标:

1. 查找 S3 文件的总字节数

以下代码片段展示了对我们的 S3 文件执行HEAD请求并确定文件大小(以字节为单位)的函数。



def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size


Enter fullscreen mode Exit fullscreen mode

2. 创建一个生成器来流式传输数据块

现在,逻辑是分块输出 S3 文件的字节流,直到达到文件大小限制。请放心,这种连续扫描不会导致响应中出现行重叠 😉(请查看输出图像/GitHub 仓库)。很简单,对吧?😝



import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = '<s3-bucket>'
    key = '<s3-key>'
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')


Enter fullscreen mode Exit fullscreen mode

流式文件输出

恭喜!👏 我们成功解决了处理大型 S3 文件而不导致系统崩溃的关键挑战之一。🤘

📌 您可以查看我的 GitHub 代码库,其中包含此方法的完整工作示例 👇

GitHub 标志 伊德里斯·兰普拉瓦拉/ s3-select-demo

本项目展示了 AWS S3 Select 的强大功能,可以分页方式流式传输大型数据文件。

AWS S3 选择演示

MIT许可证

AWS S3 Select该项目展示了在流式传输大型数据文件的丰富功能paginated style

目前S3 Select不支持分页OFFSET,因此我们无法对查询结果进行分页。因此,我们使用scanrange流式传输功能来传输 S3 文件的内容。

背景

导入(读取)大文件会导致Out of Memory错误,甚至可能导致系统崩溃。虽然有一些库(例如 Pandas、Dask 等)非常擅长处理大文件,但前提是文件必须存在于本地,也就是说我们需要将其从 S3 导入到本地计算机。但如果我们不想一次性将整个 S3 文件获取并存储到本地呢?🤔

我们可以利用AWS S3 Select它的参数来传输大型文件ScanRange。这种方法……





✔️ 使用 S3-Select 的好处

  • 减少 I/O——从而提高性能
  • 数据传输费用降低,从而降低了成本
  • ScanRange使用多个线程/进程可以并行运行多个数据块,从而加快文件处理速度。

❗ S3 Select 的局限性

  • 输入或结果中单条记录的最大长度为 1 MB
  • Amazon S3 Select 只能使用 JSON 输出格式输出嵌套数据。
  • S3 select 返回一个编码字节流,因此我们需要遍历返回的字节流并解码输出。records['Payload'].decode('utf-8')
  • 仅适用于以 CSV、JSON 或 Apache Parquet 格式存储的对象。如需更多灵活性/功能,您可以选择AWS Athena。

📑 资源


您可能还想阅读本文的续篇👇


下次见!😋
文章来源:https://dev.to/idrisrampurawala/efficiently-streaming-a-large-aws-s3-file-via-s3-select-4on