添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
使用 Python 和 Boto3 对 Amazon DynamoDB 进行编程 - Amazon DynamoDB

使用 Python 和 Boto3 对 Amazon DynamoDB 进行编程

本指南为想要结合 Python 使用 Amazon DynamoDB 的程序员提供了指导。了解不同的抽象层、配置管理、错误处理、控制重试策略、管理 keep-alive 等。

Boto 简介

您可以使用官方适用于 Python 的 AWS SDK(通常称为 Boto3 )从 Python 访问 DynamoDB。Boto 这个名字(发音为 boh-toh)来自一种原产于亚马逊河的淡水海豚。Boto3 库是该库的第三个主要版本,于 2015 年首次发布。Boto3 库非常大,因为它支持所有 AWS 服务,而不仅仅是 DynamoDB。此指导仅针对 Boto3 中与 DynamoDB 相关的部分。

Boto 由 AWS 作为 GitHub 上托管的开源项目进行维护和发布。它分为两个软件包: Botocore Boto3

Botocore 提供了低级别功能。在 Botocore 中,您将找到客户端、会话、凭证、配置和异常类。

Boto3 基于 Botocore 构建。它提供了更高级别、更具 Python 风格的接口。具体而言,它将 DynamoDB 表作为资源公开,与较低级别的面向服务的客户端接口相比,它提供了更简单、更优雅的接口。

由于这些项目托管在 GitHub 上,因此您可以查看源代码、跟踪未解决的问题或提交自己的问题。

使用 Boto 文档

通过以下资源开始学习 Boto 文档:

快速入门部分 开始,该部分为软件包安装提供了可靠的起点。如果尚未安装 Boto3,请前往那里获取有关安装 Boto3 的说明(Boto3 通常会在诸如 AWS Lambda 的 AWS 服务中自动提供)。

之后,请关注文档的 DynamoDB 指南 。它向您展示了如何执行基本的 DynamoDB 活动:创建和删除表、操作项目、运行批量操作、运行查询和执行扫描。其示例使用 资源 接口。如果看到 boto3.resource('dynamodb') ,表明您正在使用较高级别的 资源 接口。

阅读完指南后,您可以查看 DynamoDB 参考 。此登录页面提供了可供您使用的类和方法的详尽列表。在顶部,您将看到 DynamoDB.Client 类。这提供了对所有控制面板和数据面板操作的低级别访问。在底部,请看 DynamoDB.ServiceResource 类。这是较高级别的 Python 风格的接口。使用它,您可以创建表、对表执行批量操作或获取表特定操作的 DynamoDB.ServiceResource.Table 实例。

了解客户端和资源抽象层

您将使用的两个接口是 客户端 接口和 资源 接口。

低级别 客户端 接口提供与底层服务 API 的一对一映射。DynamoDB 提供的每个 API 都可通过客户端获得。这意味着客户端接口可以提供完整的功能,但使用起来往往更加冗长且复杂。

更高级别的 资源 接口不提供底层服务 API 的一对一映射。但是,它提供了一些方法,让您能够更方便地访问 batch_writer 等服务。

以下是使用客户端接口插入项目的示例。请注意所有值是如何以映射形式传递的,键表示它们的类型(“S”代表字符串,“N”代表数字),它们的值作为字符串。这被称为 DynamoDB JSON 格式。

import boto3 dynamodb = boto3.client('dynamodb') dynamodb.put_item( TableName='YourTableName', Item={ 'pk': {'S': 'id#1'}, 'sk': {'S': 'cart#123'}, 'name': {'S': 'SomeName'}, 'inventory': {'N': '500'}, # ... more attributes ...

以下是使用资源接口的相同 PutItem 操作。数据输入是隐式的:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': 'id#1', 'sk': 'cart#123', 'name': 'SomeName', 'inventory': 500, # ... more attributes ...

如果需要,您可以使用随 boto3 提供的 TypeSerializerTypeDeserializer 类在常规 JSON 和 DynamoDB JSON 之间进行转换:

def dynamo_to_python(dynamo_object: dict) -> dict: deserializer = TypeDeserializer() return { k: deserializer.deserialize(v) for k, v in dynamo_object.items() def python_to_dynamo(python_object: dict) -> dict: serializer = TypeSerializer() return { k: serializer.serialize(v) for k, v in python_object.items()

下面展示了如何使用客户端接口执行查询。它将查询表示为 JSON 构造。它使用 KeyConditionExpression 字符串,该字符串需要变量替换来处理任何潜在的关键字冲突:

import boto3 client = boto3.client('dynamodb') # Construct the query response = client.query( TableName='YourTableName', KeyConditionExpression='pk = :pk_val AND begins_with(sk, :sk_val)', FilterExpression='#name = :name_val', ExpressionAttributeValues={ ':pk_val': {'S': 'id#1'}, ':sk_val': {'S': 'cart#'}, ':name_val': {'S': 'SomeName'}, ExpressionAttributeNames={ '#name': 'name',

使用资源接口的相同查询操作可以缩短和简化:

import boto3 from boto3.dynamodb.conditions import Key, Attr dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') response = table.query( KeyConditionExpression=Key('pk').eq('id#1') & Key('sk').begins_with('cart#'), FilterExpression=Attr('name').eq('SomeName')

最后一个例子,假设您想得到一个表的大致大小(这是保存在表中的元数据,大约每 6 小时更新一次)。使用客户端接口,您可以执行 describe_table() 操作并从返回的 JSON 结构中提取答案:

import boto3 dynamodb = boto3.client('dynamodb') response = dynamodb.describe_table(TableName='YourTableName') size = response['Table']['TableSizeBytes']

通过资源接口,表隐式执行描述操作,并将数据直接作为属性呈现:

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') size = table.table_size_bytes
注意

在考虑使用客户端接口还是资源接口进行开发时,请注意,根据以下资源文档,不会向资源接口添加新特征:“AWS Python SDK 团队不打算向 boto3 的资源接口中添加新特征。现有接口将在 boto3 的生命周期内继续使用。客户可以通过客户端接口访问更新的服务特征。”

使用表资源 batch_writer

只有较高级别的表资源才有的一种便利就是 batch_writer。DynamoDB 支持批量写入操作,允许在一个网络请求中执行多达 25 个放置或删除操作。像这样的批处理可以最大限度地减少网络往返行程,从而提高效率。

使用低级别客户端库,您可以使用 client.batch_write_item() 操作来运行批处理。您必须手动将工作分成 25 个批次。每次操作后,您还必须请求接收未处理的项目列表(有些写入操作可能成功,而有些可能失败)。然后,您必须将这些未处理的项目再次传递到以后的 batch_write_item() 操作中。有大量的样板代码。

Table.batch_writer 方法创建了一个上下文管理器,用于批量写入对象。它提供了一个接口,在这个接口中,您好像是一次写入一个项目,但在内部,它是缓冲并批量发送项目。此外,它还会隐式处理未处理的项目重试。

dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') movies = # long list of movies in {'pk': 'val', 'sk': 'val', etc} format with table.batch_writer() as writer: for movie in movies: writer.put_item(Item=movie)

探索客户端和资源层的其它代码示例

您还可以参考以下代码示例存储库,这些存储库使用客户端和资源探索各种函数的用法:

了解客户端和资源对象如何与会话和线程交互

资源对象并非线程安全的,不应跨线程或进程共享。有关更多详细信息,请参阅资源指南

相比之下,客户端对象通常是线程安全的,但特定的高级特征除外。有关更多详细信息,请参阅客户端指南

会话对象不是线程安全的。因此,每次在多线程环境中创建客户端或资源时,都应先创建一个新会话,然后从会话中创建客户端或资源。有关更多详细信息,请参阅会话指南

如果调用 boto3.resource(),则隐式使用默认会话。这便于编写单线程代码。在编写多线程代码时,您需要先为每个线程构造一个新的会话,然后从该会话中检索资源:

# Explicitly create a new Session for this thread session = boto3.Session() dynamodb = session.resource('dynamodb')

自定义配置对象

在构造客户端或资源对象时,您可以传递可选的命名参数来自定义行为。名为 config 的参数可以解锁各种功能。它是 botocore.client.Config 的一个实例,配置参考文档展示了它公开供您控制的所有内容。配置指南提供了一个很好的概述。

注意

您可以在 AWS 配置文件内的会话级别,或作为环境变量修改其中的许多行为设置。

自定义配置的一个用途是调整网络行为:

connect_timeout(浮点或整数)– 尝试建立连接时引发超时异常之前的时间(以秒为单位)。默认值为 60 秒。

read_timeout(浮点或整数)– 尝试从连接中读取时引发超时异常之前的时间(以秒为单位)。默认值为 60 秒。

对于 DynamoDB 来说,60 秒的超时时间过长。这意味着暂时性网络故障会导致客户端延迟一分钟,然后才能重试。以下代码将超时时间缩短到一秒:

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0 dynamodb = boto3.resource('dynamodb', config=my_config)

有关超时的更多讨论,请参阅调整延迟感知型 DynamoDB 应用程序的 AWS Java SDK HTTP 请求设置。注意,Java SDK 的超时配置比 Python 多。

keep-alive 配置

如果您使用的是 botocore 1.27.84 或更高版本,您也可以控制 TCP Keep-Alive

tcp_keepalive(布尔值)- 如果设置为 True(默认为 False),则启用创建新连接时使用的 TCP Keep-Alive 套接字选项。这仅支持 botocore 1.27.84 及更高版本。

将 TCP Keep-Alive 设置为 True 可以减少平均延迟。以下是当您拥有合适的 botocore 版本时,有条件地将 TCP Keep-Alive 设置为 true 的示例代码:

import botocore import boto3 from botocore.config import Config from distutils.version import LooseVersion required_version = "1.27.84" current_version = botocore.__version__ my_config = Config( connect_timeout = 0.5, read_timeout = 0.5 if LooseVersion(current_version) > LooseVersion(required_version): my_config = my_config.merge(Config(tcp_keepalive = True)) dynamodb = boto3.resource('dynamodb', config=my_config)
注意

TCP Keep-Alive 与 HTTP Keep-Alive 不同。使用 TCP Keep-Alive,底层操作系统通过套接字连接发送小数据包,以保持连接处于活动状态并立即检测到任何丢包。使用 HTTP Keep-Alive,在底层套接字上构建的 Web 连接可以重复使用。使用 boto3 时,HTTP Keep-Alive 始终处于启用状态。

空闲连接可以保持活动状态的时间是有限制的。如果您有一个空闲的连接,但希望下次请求使用已经建立的连接,可以考虑定期发送请求(比如每分钟发送一次)。

该配置还接受名为 retries 的字典,您可以在其中指定所需的重试行为。当 SDK 收到错误且错误属于临时类型时,SDK 中会发生重试。如果在内部重试错误(并且重试最终生成成功响应),则从调用代码的角度来看,没有错误,只是延迟略有增加。以下是您可以指定的值:

max_attempts – 一个整数,表示单个请求将进行的最大重试次数。例如,将此值设置为 2 将导致请求在初始请求之后最多重试两次。将此值设置为 0 将导致在初始请求之后不会尝试任何重试。

total_max_attempts – 一个整数,表示单个请求将进行的最大总尝试次数。这包括初始请求,因此值为 1 表示不会重试任何请求。如果同时提供 total_max_attemptsmax_attempts,则 total_max_attempts 优先。total_max_attempts 之所以优先于 max_attempts,是因为它映射到 AWS_MAX_ATTEMPTS 环境变量和 max_attempts 配置文件值。

mode – 一个字符串,表示 botocore 应使用的重试模式类型。有效值为:

legacy - 默认模式。首次重试等待 50 毫秒,然后使用基准因子为 2 的指数回退。对于 DynamoDB,除非使用上述值覆盖,否则它最多总共可执行 10 次尝试。

注意

使用指数回退,最后一次尝试将等待几乎 13 秒。

standard – 之所以命名为标准版,是因为它与其它 AWS SDK 更加一致。对于首次重试,随机等待 0 毫秒到 1000 毫秒不等。如果需要再次重试,它会从 0 毫秒到 1000 毫秒之间随机选择另一个时间,然后将其乘以 2。如果需要更多重试,它会进行相同的随机选择,然后乘以 4,依此类推。每次等待的上限为 20 秒。与 legacy 模式相比,此模式将对更多检测到的故障条件执行重试。对于 DynamoDB,除非使用上述值覆盖,否则它最多总共可执行 3 次尝试。

adaptive - 一种实验性重试模式,包括标准模式的所有功能,但添加了自动客户端限制。通过自适应速率限制,SDK 可以降低请求的发送速率,以便更好地容纳 AWS 服务的容量。这是一种临时模式,其行为可能会发生变化。

这些重试模式的扩展定义可以在重试指南中找到,也可以在 SDK 参考的“重试行为”主题中找到。

以下示例明确使用 legacy 重试策略,请求总数最多为 3 次(2 次重试):

import boto3 from botocore.config import Config my_config = Config( connect_timeout = 1.0, read_timeout = 1.0, retries = { 'mode': 'legacy', 'total_max_attempts': 3 dynamodb = boto3.resource('dynamodb', config=my_config)

由于 DynamoDB 是一个高可用、低延迟的系统,因此您可能希望在重试速度上比内置重试策略所允许的更激进。您可以实现自己的重试策略,方法是将最大尝试次数设置为 0,自己捕获异常,然后根据自己的代码酌情重试,而不是依赖 boto3 进行隐式重试。

如果您管理自己的重试策略,则需要区分节流和错误:

节流(由 ProvisionedThroughputExceededExceptionThrottlingException 表示)表示服务运行正常,它会通知您已超出 DynamoDB 表或分区的读取或写入容量。每过一毫秒,就会有多一点的读取或写入容量可用,因此您可以快速重试(例如每 50 毫秒重试一次),尝试访问新释放的容量。使用节流,您并不特别需要指数回退,因为节流属于轻量级,DynamoDB 可以返回,而且不会向您收取每次请求的费用。指数回退会将更长的延迟分配给已经等待最长时间的客户端线程,从统计学上讲,将超越 p50 和 p99。

错误(由 InternalServerErrorServiceUnavailable 等表示)表示服务存在暂时性问题。这可以是针对整个表,也可以只是您正在读取或写入的分区。使用错误,您可以在重试前暂停更长时间(例如 250 毫秒或 500 毫秒),并使用抖动来错开重试。

最大池连接数配置

最后,配置允许您控制连接池大小:

max_pool_connections (int) – 连接池中要保留的最大连接数。如果未指定值,则使用默认值 10。

此选项控制要保留在池中以供重复使用的最大 HTTP 连接数。每个会话保留一个不同的池。如果您预计会有超过 10 个线程使用基于同一会话构建的客户端或资源,则应考虑提高该值,这样线程就不必等待使用池连接的其它线程。

import boto3 from botocore.config import Config my_config = Config( max_pool_connections = 20 # Setup a single session holding up to 20 pooled connections session = boto3.Session(my_config) # Create up to 20 resources against that session for handing to threads # Notice the single-threaded access to the Session and each Resource resource1 = session.resource('dynamodb') resource2 = session.resource('dynamodb') # etc

Boto3 中并未全部静态定义 AWS 服务异常。这是因为 AWS 服务的错误和异常差异很大,并且可能会发生变化。Boto3 将所有服务异常封装为 ClientError,并以结构化 JSON 公开详细信息。例如,错误响应的结构可能如下所示:

'Error': { 'Code': 'SomeServiceException', 'Message': 'Details/context around the exception or error' 'ResponseMetadata': { 'RequestId': '1234567890ABCDEF', 'HostId': 'host ID data will appear here as a hash', 'HTTPStatusCode': 400, 'HTTPHeaders': {'header metadata key/values will appear here'}, 'RetryAttempts': 0

以下代码会捕获任何 ClientError 异常,并查看 ErrorCode 的字符串值来确定要采取的操作:

import botocore import boto3 dynamodb = boto3.client('dynamodb') response = dynamodb.put_item(...) except botocore.exceptions.ClientError as err: print('Error Code: {}'.format(err.response['Error']['Code'])) print('Error Message: {}'.format(err.response['Error']['Message'])) print('Http Code: {}'.format(err.response['ResponseMetadata']['HTTPStatusCode'])) print('Request ID: {}'.format(err.response['ResponseMetadata']['RequestId'])) if err.response['Error']['Code'] in ('ProvisionedThroughputExceededException', 'ThrottlingException'): print("Received a throttle") elif err.response['Error']['Code'] == 'InternalServerError': print("Received a server error") else: raise err

一些(但不是全部)异常代码已被具体化为顶级类。您可以选择直接处理这些类。使用客户端接口时,这些异常会在您的客户端上动态填充,您可以使用客户端实例捕获这些异常,如下所示:

except ddb_client.exceptions.ProvisionedThroughputExceededException:

使用资源接口时,必须使用 .meta.client 从资源遍历到底层客户端才能访问异常,如下所示:

except ddb_resource.meta.client.exceptions.ProvisionedThroughputExceededException:

要查看具体化异常类型的列表,可以动态生成列表:

ddb = boto3.client("dynamodb") print([e for e in dir(ddb.exceptions) if e.endswith('Exception') or e.endswith('Error')])

使用条件表达式执行写入操作时,您可以请求:如果表达式失败,则应在错误响应中返回该项目的值。

response = table.put_item( Item=item, ConditionExpression='attribute_not_exists(pk)', ReturnValuesOnConditionCheckFailure='ALL_OLD' except table.meta.client.exceptions.ConditionalCheckFailedException as e: print('Item already exists:', e.response['Item'])

要进一步了解错误处理和异常,请执行以下操作:

logging.basicConfig(level=logging.INFO)

这会将根记录器配置为记录 INFO 及更高级别的消息。严重程度低于这些级别的日志消息将被忽略。日志记录级别包括 DEBUGINFOWARNINGERRORCRITICAL。默认为 WARNING

Boto3 中的记录器是分层的。该库使用几个不同的记录器,每个记录器对应库的不同部分。您可以分别控制每个记录器的行为:

botocore.parsers:用于在解析 AWS 服务响应之前对其进行记录。

botocore.retryhandler:用于记录 AWS 服务请求重试的处理情况(传统模式)。

botocore.retries.standard:用于记录 AWS 服务请求重试的处理情况(标准模式或自适应模式)。

botocore.utils:用于记录库中的其它活动。

botocore.waiter:用于记录 Waiter 的功能,后者轮询 AWS 服务直到达到特定状态。

其它库也记录。在内部,boto3 使用第三方 urllib3 进行 HTTP 连接处理。如果延迟很重要,您可以通过查看 urllib3 何时建立新连接或关闭空闲连接来查看其日志,以确保您的池得到充分利用。

urllib3.connectionpool:用于记录连接池处理事件。

以下代码段将端点和连接池活动的大部分记录设置为 INFODEBUG

import logging logging.getLogger('boto3').setLevel(logging.INFO) logging.getLogger('botocore').setLevel(logging.INFO) logging.getLogger('botocore.endpoint').setLevel(logging.DEBUG) logging.getLogger('urllib3.connectionpool').setLevel(logging.DEBUG)

Botocore 在其执行的各个阶段都会发出事件。您可以为这些事件注册处理程序,这样无论何时发出事件,您的处理程序都会被调用。这使您无需修改内部结构即可扩展 botocore 的行为。

例如,假设您要跟踪应用程序中的任何 DynamoDB 表上每次调用 PutItem 操作的情况。每次在关联的会话上调用 PutItem 操作时,您都可以在 'provide-client-params.dynamodb.PutItem' 事件上注册以捕获并记录。示例如下:

import boto3 import botocore import logging def log_put_params(params, **kwargs): if 'TableName' in params and 'Item' in params: logging.info(f"PutItem on table {params['TableName']}: {params['Item']}") logging.basicConfig(level=logging.INFO) session = boto3.Session() event_system = session.events # Register our interest in hooking in when the parameters are provided to PutItem event_system.register('provide-client-params.dynamodb.PutItem', log_put_params) # Now, every time you use this session to put an item in DynamoDB, # it will log the table name and item data. dynamodb = session.resource('dynamodb') table = dynamodb.Table('YourTableName') table.put_item( Item={ 'pk': '123', 'sk': 'cart#123', 'item_data': 'YourItemData', # ... more attributes ...

在处理程序中,您甚至可以通过编程方式操作参数来改变行为:

params['TableName'] = "NewTableName"

有关事件的更多信息,请参阅事件的 botocore 文档事件的 boto3 文档

分页和分页工具

某些请求(例如查询和扫描)会限制针对单个请求返回的数据大小,并要求您重复请求才能显示后续页面。

您可以使用 limit 参数控制每页可读取的最大项目数。例如,如果您想要读取最后 10 个项目,则可以使用 limit 仅检索最后 10 个项目。请注意,限制是在应用任何筛选条件之前应从表中读取多少项目。筛选后无法确切指定想要检索 10 个项目;只有在切实检索到 10 个项目后,您才能控制预先筛选的数量并检查客户端。不管限制如何,每个响应始终有 1 MB 的最大大小。

如果响应包含 LastEvaluatedKey,则表示响应已结束,因为它达到了计数或大小限制。此密钥是针对该响应评估的最后一个密钥。您可以检索此 LastEvaluatedKey 并将其作为 ExclusiveStartKey 传递给后续调用,以便从该起点读取下一个数据块。如果未返回 LastEvaluatedKey,则表示没有其它与查询或扫描匹配的项目。

以下是一个简单的示例(使用资源接口,但客户端接口具有相同的模式),它每页最多读取 100 个项目,并循环操作,直到读取完所有项目。

import boto3 dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('YourTableName') query_params = { 'KeyConditionExpression': Key('pk').eq('123') & Key('sk').gt(1000), 'Limit': 100 while True: response = table.query(**query_params) # Process the items however you like for item in response['Items']: print(item) # No LastEvaluatedKey means no more items to retrieve if 'LastEvaluatedKey' not in response: break # If there are possibly more items, update the start key for the next page query_params['ExclusiveStartKey'] = response['LastEvaluatedKey']

为方便起见,boto3 可以用分页工具为您执行此操作。但是,它仅适用于客户端接口。以下是为使用分页工具而重写的代码:

import boto3 dynamodb = boto3.client('dynamodb') paginator = dynamodb.get_paginator('query') query_params = { 'TableName': 'YourTableName', 'KeyConditionExpression': 'pk = :pk_val AND sk > :sk_val', 'ExpressionAttributeValues': { ':pk_val': {'S': '123'}, ':sk_val': {'N': '1000'}, 'Limit': 100 page_iterator = paginator.paginate(**query_params) for page in page_iterator: # Process the items however you like for item in page['Items']: print(item)

有关更多信息,请参阅分页工具指南DynamoDB.Paginator.Query 的 API 参考

注意

分页工具也有自己的配置设置,名为 MaxItemsStartingTokenPageSize。要使用 DynamoDB 进行分页,则应忽略这些设置。

Waiter

通过 Waiter 可以等待某件事完成后再继续操作。目前,它们仅支持等待创建或删除表。在后台,Waiter 操作每 20 秒为您检查一次,最多 25 次。您可以自己做,但是在编写自动化时使用 Waiter 会很舒服。

以下代码演示了如何等待创建特定表:

# Create a table, wait until it exists, and print its ARN response = client.create_table(...)