通过 Python 开始使用适用于 NoSQL 的 Azure Cosmos DB

适用范围: NoSQL

本文介绍如何使用 Python SDK 连接到 Azure Cosmos DB for NoSQL。 连接后,可对数据库、容器和项执行操作。

包 (PyPi) | 示例 | API 参考 | 库源代码 | 反馈

先决条件

设置项目

创建可运行 Python 代码的环境。

通过虚拟环境可以在隔离的环境中安装 Python 包,而不会影响系统的其余部分。

在虚拟环境中安装 Azure Cosmos DB for NoSQL Python SDK。

pip install azure-cosmos

创建 Python 应用程序

在你的环境中,创建新的 app.py 文件并在其中添加以下代码:

"""Sample showing how to connect with endpoint and key."""
# <imports>
import json
import os
import sys
import uuid

from azure.core.exceptions import AzureError
from azure.cosmos import CosmosClient, PartitionKey

# </imports>

DATABASE_ID = "cosmicworks"
CONTAINER_ID = "products"
# <client>
ENDPOINT = os.environ["COSMOS_ENDPOINT"]
KEY = os.environ["COSMOS_KEY"]

client = CosmosClient(url=ENDPOINT, credential=KEY)
# </client>


def main():
    """How to CosmosDB and NoSQL samples."""
    try:
        # Create database and partition key.
        database = client.create_database_if_not_exists(id=DATABASE_ID)

        # Create a container.
        partition_key_path = PartitionKey(path="/categoryId")
        container = database.create_container_if_not_exists(
            id=CONTAINER_ID,
            partition_key=partition_key_path,
            offer_throughput=400,
        )

        # Create a new item.
        new_guid = str(uuid.uuid4())
        new_item = {
            "id": new_guid,
            "categoryId": "61dba35b-4f02-45c5-b648-c6badc0cbd79",
            "categoryName": "gear-surf-surfboards",
            "name": "Yamba Surfboard",
            "quantity": 12,
            "sale": False,
        }
        container.create_item(new_item)

        # Query items.
        sql_stmt = "SELECT * FROM products p WHERE p.categoryId = @categoryId"
        category_id = "61dba35b-4f02-45c5-b648-c6badc0cbd79"
        params = [dict(name="@categoryId", value=category_id)]

        items = container.query_items(
            query=sql_stmt,
            parameters=params,
            enable_cross_partition_query=False,
        )

        for item in items:
            print(json.dumps(item, indent=True))

    except AzureError as err:
        sys.exit("Error:" + str(err))


if __name__ == "__main__":
    main()

以上代码将导入你要在本文余下内容中使用的模块。

连接到 Azure Cosmos DB for NoSQL

若要连接到 Azure Cosmos DB 的 NoSQL API,请创建 CosmosClient 类的实例。 此类是针对数据库执行所有操作的起点。 可以通过三种方式使用 CosmosClient 类连接到 API for NoSQL 帐户:

使用终结点和密钥连接

CosmosClient 的构造函数有两个必需的参数:

参数 示例值 说明
url COSMOS_ENDPOINT 环境变量 用于所有请求的 API for NoSQL 终结点。
credential COSMOS_KEY 环境变量 身份验证时要使用的帐户密钥或资源令牌。

检索帐户终结点和密钥

  1. 为 resourceGroupName 创建 shell 变量。

    # Variable for resource group name
    resourceGroupName="msdocs-cosmos-python-howto-rg"
    
  2. 使用 az cosmosdb list 命令检索资源组中第一个 Azure Cosmos DB 帐户的名称,并将其存储在 accountName shell 变量中。

    # Retrieve most recently created account name
    accountName=$(
        az cosmosdb list \
            --resource-group $resourceGroupName \
            --query "[0].name" \
            --output tsv
    )
    
  3. 使用 az cosmosdb show 命令获取帐户的 API for NoSQL 终结点 URI。

    az cosmosdb show \
        --resource-group $resourceGroupName \
        --name $accountName \
        --query "documentEndpoint"
    
  4. 使用 az-cosmosdb-keys-list 命令从帐户的密钥列表中查找主密钥。

    az cosmosdb keys list \
        --resource-group $resourceGroupName \
        --name $accountName \
        --type "keys" \
        --query "primaryMasterKey"
    
  5. 记录 URI 和主密钥值。 稍后将使用这些凭据。

若要在 Python 代码中使用“URI”和“主密钥”值,请将其保存到运行应用程序的本地计算机上的新环境变量。

$env:COSMOS_ENDPOINT = "<cosmos-account-URI>"
$env:COSMOS_KEY = "<cosmos-account-PRIMARY-KEY>"

使用帐户终结点和密钥创建 CosmosClient

使用 COSMOS_ENDPOINTCOSMOS_KEY 环境变量作为参数,创建 CosmosClient 类的新实例。

"""Sample showing how to connect with endpoint and key."""
# <imports>
import json
import os
import sys
import uuid

from azure.core.exceptions import AzureError
from azure.cosmos import CosmosClient, PartitionKey

# </imports>

DATABASE_ID = "cosmicworks"
CONTAINER_ID = "products"
# <client>
ENDPOINT = os.environ["COSMOS_ENDPOINT"]
KEY = os.environ["COSMOS_KEY"]

client = CosmosClient(url=ENDPOINT, credential=KEY)
# </client>


def main():
    """How to CosmosDB and NoSQL samples."""
    try:
        # Create database and partition key.
        database = client.create_database_if_not_exists(id=DATABASE_ID)

        # Create a container.
        partition_key_path = PartitionKey(path="/categoryId")
        container = database.create_container_if_not_exists(
            id=CONTAINER_ID,
            partition_key=partition_key_path,
            offer_throughput=400,
        )

        # Create a new item.
        new_guid = str(uuid.uuid4())
        new_item = {
            "id": new_guid,
            "categoryId": "61dba35b-4f02-45c5-b648-c6badc0cbd79",
            "categoryName": "gear-surf-surfboards",
            "name": "Yamba Surfboard",
            "quantity": 12,
            "sale": False,
        }
        container.create_item(new_item)

        # Query items.
        sql_stmt = "SELECT * FROM products p WHERE p.categoryId = @categoryId"
        category_id = "61dba35b-4f02-45c5-b648-c6badc0cbd79"
        params = [dict(name="@categoryId", value=category_id)]

        items = container.query_items(
            query=sql_stmt,
            parameters=params,
            enable_cross_partition_query=False,
        )

        for item in items:
            print(json.dumps(item, indent=True))

    except AzureError as err:
        sys.exit("Error:" + str(err))


if __name__ == "__main__":
    main()

使用连接字符串连接

CosmosClient 类有一个 from_connection_string 方法,可以使用该方法通过一个必需参数进行连接:

参数 示例值 说明
conn_str COSMOS_CONNECTION_STRING 环境变量 API for NoSQL 帐户的连接字符串。
credential COSMOS_KEY 环境变量 一个可选的帐户密钥或资源令牌,用于代替连接字符串中的帐户密钥或资源令牌。

检索帐户连接字符串

  1. 使用 az cosmosdb list 命令检索资源组中第一个 Azure Cosmos DB 帐户的名称,并将其存储在 accountName shell 变量中。

    # Retrieve most recently created account name
    accountName=$(
        az cosmosdb list \
            --resource-group $resourceGroupName \
            --query "[0].name" \
            --output tsv
    )
    
  2. 使用 az-cosmosdb-keys-list 命令,从帐户的连接字符串列表中查找主连接字符串。

    az cosmosdb keys list \
        --resource-group $resourceGroupName \
        --name $accountName \
        --type "connection-strings" \
        --query "connectionStrings[?description == \`Primary SQL Connection String\`] | [0].connectionString"
    

若要在 Python 代码中使用“主连接字符串”值,请将其保存到运行应用程序的本地计算机上的新环境变量。

$env:COSMOS_CONNECTION_STRING = "<cosmos-account-PRIMARY-CONNECTION-STRING>"

使用连接字符串创建 CosmosClient

使用 COSMOS_CONNECTION_STRING 环境变量作为唯一参数,创建 CosmosClient 类的新实例。

"""Sample showing how to connect with connection string."""
import json
import os
import sys
import uuid

from azure.core.exceptions import AzureError
from azure.cosmos import CosmosClient, PartitionKey

DATABASE_ID = "cosmicworks"
CONTAINER_ID = "products"
# <connection_string>
CONN_STR = os.environ["COSMOS_CONNECTION_STRING"]

client = CosmosClient.from_connection_string(conn_str=CONN_STR)
# </connection_string>


def main():
    """How to CosmosDB and NoSQL samples."""
    try:
        # Create database and partition key.
        database = client.create_database_if_not_exists(id=DATABASE_ID)

        # Create a container.
        partition_key_path = PartitionKey(path="/categoryId")
        container = database.create_container_if_not_exists(
            id=CONTAINER_ID,
            partition_key=partition_key_path,
            offer_throughput=400,
        )

        # Create a new item.
        new_guid = str(uuid.uuid4())
        new_item = {
            "id": new_guid,
            "categoryId": "61dba35b-4f02-45c5-b648-c6badc0cbd79",
            "categoryName": "gear-surf-surfboards",
            "name": "Yamba Surfboard",
            "quantity": 12,
            "sale": False,
        }
        container.create_item(new_item)

        # Query items.
        sql_stmt = "SELECT * FROM products p WHERE p.categoryId = @categoryId"
        category_id = "61dba35b-4f02-45c5-b648-c6badc0cbd79"
        params = [dict(name="@categoryId", value=category_id)]

        items = container.query_items(
            query=sql_stmt,
            parameters=params,
            enable_cross_partition_query=False,
        )

        for item in items:
            print(json.dumps(item, indent=True))

    except AzureError as err:
        sys.exit("Error:" + str(err))


if __name__ == "__main__":
    main()

使用 Microsoft 标识平台进行连接

若要使用 Microsoft 标识平台和 Microsoft Entra ID 连接到 API for NoSQL 帐户,请使用安全主体。 主体的确切类型将取决于应用程序代码的托管位置。 下表可用作快速参考指南。

应用程序的运行位置 安全主体
本地计算机(开发和测试) 用户标识或服务主体
Azure 托管标识
Azure 外部的服务器或客户端 服务主体

导入 Azure.Identity

azure-identity 包包含所有 Azure SDK 库之间共享的核心身份验证功能。

azure-identity 包导入你的环境。

pip install azure-identity

使用默认凭据实现创建 CosmosClient

如果在本地计算机上进行测试,或者应用程序将在 Azure 服务上运行,并直接支持托管标识,则通过创建 DefaultAzureCredential 实例来获取 OAuth 令牌。

在 app.py 中:

"""Sample showing how to connect with AAD."""
import json
import os
import sys
import uuid

from azure.core.exceptions import AzureError
from azure.cosmos import CosmosClient

# <credential>
from azure.identity import DefaultAzureCredential

ENDPOINT = os.environ["COSMOS_ENDPOINT"]

credential = DefaultAzureCredential()

client = CosmosClient(ENDPOINT, credential)
# </credential>

DATABASE_ID = "cosmicworks"
CONTAINER_ID = "products"


def main():
    """How to CosmosDB and NoSQL samples."""
    try:
        # Get database.
        database = client.get_database_client(DATABASE_ID)

        # Get container.
        container = database.get_container_client(CONTAINER_ID)
        print("Container info: " + str(container.read()))

        # Create a new item.
        new_guid = str(uuid.uuid4())
        new_item = {
            "id": new_guid,
            "categoryId": "61dba35b-4f02-45c5-b648-c6badc0cbd79",
            "categoryName": "gear-surf-surfboards",
            "name": "Yamba Surfboard",
            "quantity": 12,
            "sale": False,
        }
        container.create_item(new_item)

        # Query items.
        sql_stmt = "SELECT * FROM products p WHERE p.categoryId = @categoryId"
        category_id = "61dba35b-4f02-45c5-b648-c6badc0cbd79"
        params = [dict(name="@categoryId", value=category_id)]

        items = container.query_items(
            query=sql_stmt,
            parameters=params,
            enable_cross_partition_query=False,
        )

        for item in items:
            print(json.dumps(item, indent=True))

    except AzureError as err:
        sys.exit("Error:" + str(err))


if __name__ == "__main__":
    main()

重要

有关如何添加正确的角色以使 DefaultAzureCredential 正常运行的详细信息,请参阅使用 Microsoft Entra ID 为 Azure Cosmos DB 帐户配置基于角色的访问控制。 具体而言,请参阅有关创建角色并将其分配到主体 ID 的部分。

使用自定义凭据实现创建 CosmosClient

如果计划在 Azure 之外部署应用程序,则可以通过使用适用于 Python 的 Azure.Identity 客户端库中的其他类来获取 OAuth 令牌。 这些其他类也派生自 TokenCredential 类。

在此示例中,我们使用客户端和租户标识符以及客户端密码来创建 ClientSecretCredential 实例。

在 app.py 中:

  • 从服务主体的环境变量中获取凭据信息。 在 Microsoft Entra ID 中注册应用程序时,可以获取客户端 ID、租户 ID 和客户端密码。 若要详细了解如何注册 Microsoft Entra 应用程序,请参阅将应用程序注册到 Microsoft 标识平台

  • 导入 ClientSecretCredential,并使用 TENANT_IDCLIENT_IDCLIENT_SECRET 环境变量作为参数创建一个实例。

  • 使用终结点和凭据作为参数创建 CosmosClient 类的新实例。

"""Sample showing how to connect with AAD service principal."""
import json
import os
import sys
import uuid

from azure.core.exceptions import AzureError
from azure.cosmos import CosmosClient

# <credential>
from azure.identity import ClientSecretCredential

ENDPOINT = os.environ["COSMOS_ENDPOINT"]
TENANT_ID = os.environ["TENANT_ID"]
CLIENT_ID = os.environ["CLIENT_ID"]
CLIENT_SECRET = os.environ["CLIENT_SECRET"]

credential = ClientSecretCredential(
    tenant_id=TENANT_ID, client_id=CLIENT_ID, client_secret=CLIENT_SECRET
)

client = CosmosClient(ENDPOINT, credential)
# </credential>

DATABASE_ID = "cosmicworks"
CONTAINER_ID = "products"


def main():
    """How to CosmosDB and NoSQL samples."""
    try:
        # Get database.
        database = client.get_database_client(DATABASE_ID)

        # Get container.
        container = database.get_container_client(CONTAINER_ID)
        print("Container info: " + str(container.read()))

        # Create a new item.
        new_guid = str(uuid.uuid4())
        new_item = {
            "id": new_guid,
            "categoryId": "61dba35b-4f02-45c5-b648-c6badc0cbd79",
            "categoryName": "gear-surf-surfboards",
            "name": "Yamba Surfboard",
            "quantity": 12,
            "sale": False,
        }
        container.create_item(new_item)

        # Query items.
        sql_stmt = "SELECT * FROM products p WHERE p.categoryId = @categoryId"
        category_id = "61dba35b-4f02-45c5-b648-c6badc0cbd79"
        params = [dict(name="@categoryId", value=category_id)]

        items = container.query_items(
            query=sql_stmt,
            parameters=params,
            enable_cross_partition_query=False,
        )

        for item in items:
            print(json.dumps(item, indent=True))

    except AzureError as err:
        sys.exit("Error:" + str(err))


if __name__ == "__main__":
    main()

生成应用程序

在生成应用程序时,代码主要与四种类型的资源交互:

  • API for NoSQL 帐户,它是 Azure Cosmos DB 数据的唯一顶级命名空间。

  • 数据库,它用于组织帐户中的容器。

  • 容器,它包含数据库中的一组单个项。

  • 项,它表示容器中的 JSON 文档。

以下图示显示了这些资源之间的关系。

Diagram of the Azure Cosmos DB hierarchy including accounts, databases, containers, and items.

顶部是显示 Azure Cosmos DB 帐户的层次结构示意图。 该帐户包含两个子数据库节点。 其中一个数据库节点包含两个子容器节点。 另一个数据库节点包含单个子容器节点。 该容器节点包含三个子项节点。

每种类型的资源由一个或多个关联的 Python 类表示。 下面是最常用的同步编程类的列表。 (azure.cosmos.aio 命名空间中有类似的异步编程类。)

说明
CosmosClient 此类为 Azure Cosmos DB 服务提供客户端逻辑表示。 此客户端对象用于对服务进行配置和执行请求。
DatabaseProxy 不一定在服务中存在的数据库接口。 不应直接实例化此类。 应改用 CosmosClient get_database_client 方法。
ContainerProxy 与特定 Cosmos DB 容器交互的接口。 不应直接实例化此类。 请改用 DatabaseProxy get_container_client 方法获取现有容器,或改用 create_container 方法创建新容器。

以下指南介绍了如何使用上述每个类来生成应用程序。

指南 说明
创建数据库 创建数据库
创建容器 创建容器
项示例 点读特定项

另请参阅

后续步骤

现在,你已连接到 API for NoSQL 帐户,请使用下一指南创建和管理数据库。