重要
Microsoft Azure Cosmos DB for MongoDB vCore 中的 Entra ID 身份验证目前为预览版。 此预览版在没有服务级别协议的情况下提供,不建议用于生产工作负荷。 某些功能不受支持或功能有限。
本指南逐步讲解如何构建 Python 控制台应用程序以连接到 Azure Cosmos DB for MongoDB vCore 群集。 配置开发环境,使用 azure.identity
用于 Python 的 Azure SDK 中的包进行身份验证,并执行创建、查询和更新文档等作。
- 现有的 Azure Cosmos DB for MongoDB (vCore) 群集。
-
- 如果想要在本地运行 CLI 引用命令,请使用
az login
该命令登录到 Azure CLI。
- 如果想要在本地运行 CLI 引用命令,请使用
接下来,创建新的控制台应用程序项目,并导入必要的库以向群集进行身份验证。
为项目创建新目录并设置虚拟环境。
mkdir cosmos-mongodb-app cd cosmos-mongodb-app python -m venv .venv
激活虚拟环境。
# On Windows .venv\Scripts\activate # On macOS/Linux source .venv/bin/activate
为应用程序创建新的 Python 文件。
touch app.py
安装用于 Azure 身份验证的
azure.identity
库。pip install azure.identity
安装适用于 Python 的
pymongo
驱动程序。pip install pymongo
现在,使用 Azure.Identity
库获取一个 TokenCredential
,以用于连接到您的群集。 官方 MongoDB 驱动程序具有一个特殊接口,必须实现该接口,以便从 Microsoft Entra 获取令牌,以便在连接到群集时使用。
在 Python 文件顶部导入必要的模块。
from azure.identity import DefaultAzureCredential from pymongo import MongoClient from pymongo.auth_oidc import OIDCCallback, OIDCCallbackContext, OIDCCallbackResult
创建实现 MongoDB OpenID Connect (OIDC) 回调接口的自定义类。
class AzureIdentityTokenCallback(OIDCCallback): def __init__(self, credential): self.credential = credential def fetch(self, context: OIDCCallbackContext) -> OIDCCallbackResult: token = self.credential.get_token( "https://ossrdbms-aad.database.chinacloudapi.cn/.default").token return OIDCCallbackResult(access_token=token)
设置群集名称变量。
clusterName = "<azure-cosmos-db-mongodb-vcore-cluster-name>"
创建 DefaultAzureCredential 的实例并设置身份验证属性。
credential = DefaultAzureCredential() authProperties = {"OIDC_CALLBACK": AzureIdentityTokenCallback(credential)}
创建配置了 Microsoft Entra 身份验证的 MongoDB 客户端。
client = MongoClient( f"mongodb+srv://{clusterName}.global.mongocluster.cosmos.azure.com/", connectTimeoutMS=120000, tls=True, retryWrites=True, authMechanism="MONGODB-OIDC", authMechanismProperties=authProperties ) print("Client created")
最后,使用官方库对数据库、集合和文档执行常见任务。 在这里,你将使用相同的类和方法与 MongoDB 或 DocumentDB 进行交互来管理集合和项。
获取对数据库的引用。
database = client.get_database("<database-name>") print("Database pointer created")
获取对集合的引用。
collection = database.get_collection("<container-name>") print("Collection pointer created")
创建文档并使用 将其更新插入到集合中。
collection.update_one
new_document = { "_id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb", "category": "gear-surf-surfboards", "name": "Yamba Surfboard", "quantity": 12, "price": 850.00, "clearance": False, } filter = { "_id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb", } payload = { "$set": new_document } result = collection.update_one(filter, payload, upsert=True)
用于
collection.find_one
从集合中检索特定文档。filter = { "_id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb", "category": "gear-surf-surfboards" } existing_document = collection.find_one(filter) print(f"Read document _id:\t{existing_document['_id']}")
查询与
collection.find
筛选器匹配的多个文档。filter = { "category": "gear-surf-surfboards" } matched_documents = collection.find(filter) for document in matched_documents: print(f"Found document:\t{document}")