这些都是本地处理可能会影响系统整体流程的典型场景。此外,如果我们在容器中运行这些文件处理单元,那么可用的磁盘空间就会受到限制。因此,我们需要一个云流式处理流程(它还可以parallelize the processing of multiple chunks通过在并行线程/进程中流式传输同一文件的不同块来处理同一文件)。我就是在这里发现了这个AWS S3 Select功能。😎
response=s3_client.select_object_content(Bucket=bucket,Key=key,ExpressionType='SQL',Expression='SELECT * FROM S3Object',InputSerialization={'CSV':{'FileHeaderInfo':'USE','FieldDelimiter':',','RecordDelimiter':'\n'}},OutputSerialization={'JSON':{'RecordDelimiter':','}})
现在,我们已经对它的工作原理有了一些了解S3 Select,让我们尝试实现我们的用例,即以流式传输的方式传输一个大文件的部分(子集),就像这样paginated API works。😋
S3 Select支持ScanRange通过指定要查询的字节范围来流式传输对象子集的参数。S3 Select请求一系列不重叠的扫描范围。扫描范围无需与记录边界对齐。如果记录的起始位置在指定的扫描范围内,但超出扫描范围,查询仍会处理该记录。这意味着查询会在扫描范围内获取该行,并可能扩展到获取整行。最终doesn't fetch a subset of a row,要么获取整行,要么跳过该行(以便在另一个扫描范围内获取)。
让我们尝试用两个简单的步骤来实现这个目标:
1. 查找 S3 文件的总字节数
以下代码片段展示了对我们的 S3 文件执行HEAD请求并确定文件大小(以字节为单位)的函数。
defget_s3_file_size(bucket:str,key:str)->int:"""Gets the file size of S3 object by a HEAD request
Args:
bucket (str): S3 bucket
key (str): S3 object path
Returns:
int: File size in bytes. Defaults to 0 if any error.
"""aws_profile=current_app.config.get('AWS_PROFILE_NAME')s3_client=boto3.session.Session(profile_name=aws_profile).client('s3')file_size=0try:response=s3_client.head_object(Bucket=bucket,Key=key)ifresponse:file_size=int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))exceptClientError:logger.exception(f'Client error reading S3 file {bucket} : {key}')returnfile_size
importastimportboto3frombotocore.exceptionsimportClientErrordefstream_s3_file(bucket:str,key:str,file_size:int,chunk_bytes=5000)->tuple[dict]:"""Streams a S3 file via a generator.
Args:
bucket (str): S3 bucket
key (str): S3 object path
chunk_bytes (int): Chunk size in bytes. Defaults to 5000
Returns:
tuple[dict]: Returns a tuple of dictionary containing rows of file content
"""aws_profile=current_app.config.get('AWS_PROFILE_NAME')s3_client=boto3.session.Session(profile_name=aws_profile).client('s3')expression='SELECT * FROM S3Object'start_range=0end_range=min(chunk_bytes,file_size)whilestart_range<file_size:response=s3_client.select_object_content(Bucket=bucket,Key=key,ExpressionType='SQL',Expression=expression,InputSerialization={'CSV':{'FileHeaderInfo':'USE','FieldDelimiter':',','RecordDelimiter':'\n'}},OutputSerialization={'JSON':{'RecordDelimiter':','}},ScanRange={'Start':start_range,'End':end_range},)"""
select_object_content() response is an event stream that can be looped to concatenate the overall result set
Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
"""result_stream=[]foreventinresponse['Payload']:ifrecords:=event.get('Records'):result_stream.append(records['Payload'].decode('utf-8'))yieldast.literal_eval(''.join(result_stream))start_range=end_rangeend_range=end_range+min(chunk_bytes,file_size-end_range)defs3_file_processing():bucket='<s3-bucket>'key='<s3-key>'file_size=get_s3_file_size(bucket=bucket,key=key)logger.debug(f'Initiating streaming file of {file_size} bytes')chunk_size=524288# 512KB or 0.5MB
forfile_chunkinstream_s3_file(bucket=bucket,key=key,file_size=file_size,chunk_bytes=chunk_size):logger.info(f'\n{30*"*"} New chunk {30*"*"}')id_set=set()forrowinfile_chunk:# perform any other processing here
id_set.add(int(row.get('id')))logger.info(f'{min(id_set)} --> {max(id_set)}')