使用 Python 管理 MongoDB 数据库
适用对象: MongoDB
Azure Cosmos DB 中的 MongoDB 服务器可从适用于 MongoDB 的常见 Python 包获取,例如:
注意
示例代码片段在 GitHub 上作为 Python 项目提供。
为数据库命名
在 Azure Cosmos DB 中,数据库类似于命名空间。 创建数据库时,数据库名称会形成用于访问数据库资源和任何子资源的 URI 段。
下面是命名数据库时的一些快速规则:
- 数据库名称长度必须介于 3 到 63 个字符之间
- 数据库名称只能包含小写字母、数字或短划线 (-) 字符。
- 数据库名称必须以小写字母或数字开头。
创建后,数据库的 URI 采用以下格式:
https://<cosmos-account-name>.documents.azure.cn/dbs/<database-name>
获取数据库实例
数据库保存集合及其文档。 若要访问数据库,请使用 MongoClient 的属性样式访问或字典样式访问。 有关详细信息,请参阅获取数据库。
下面的代码片段假设你已创建客户端连接,并且在这些代码片段之后关闭了客户端连接。
获取服务器信息
使用 MongoClient 类的 server_info 方法访问服务器信息。 无需指定数据库名称即可获取此信息。 返回的信息特定于 MongoDB,不代表 Azure Cosmos DB 平台本身。
还可以使用 MongoClient.list_database_names 方法列出数据库,并使用 MongoClient.db.command 方法向数据库发出 MongoDB 命令。
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and show admin commands"""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <server_info>
# Get server information
for k, v in client.server_info().items():
print("Key: {} , Value: {}".format(k, v))
# Get server status of admin database
print("Server status {}".format(client.admin.command("serverStatus")))
# List databases
databases = client.list_database_names()
print("Databases: {}".format(databases))
# </server_info>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
# <client_disconnect>
client.close()
# </client_disconnect>
if __name__ == "__main__":
main()
"""
# <console_result>
Key: version , Value: 3.6.0
Key: versionArray , Value: [3, 6, 0, 0]
Key: bits , Value: 64
Key: maxBsonObjectSize , Value: 16777216
Key: ok , Value: 1.0
Server status {'ok': 1.0}
Databases: ['adventureworks']
# </console_result>
"""
前面的代码片段显示的输出类似于以下示例控制台输出:
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and show admin commands"""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <server_info>
# Get server information
for k, v in client.server_info().items():
print("Key: {} , Value: {}".format(k, v))
# Get server status of admin database
print("Server status {}".format(client.admin.command("serverStatus")))
# List databases
databases = client.list_database_names()
print("Databases: {}".format(databases))
# </server_info>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
# <client_disconnect>
client.close()
# </client_disconnect>
if __name__ == "__main__":
main()
"""
# <console_result>
Key: version , Value: 3.6.0
Key: versionArray , Value: [3, 6, 0, 0]
Key: bits , Value: 64
Key: maxBsonObjectSize , Value: 16777216
Key: ok , Value: 1.0
Server status {'ok': 1.0}
Databases: ['adventureworks']
# </console_result>
"""
数据库是否存在?
如果你访问数据库时数据库不存在,适用于 Python 的 PyMongo 驱动程序会创建数据库。 但是,我们建议你改用 MongoDB 扩展命令来管理存储在 Azure Cosmos DB API for MongoDB 中的数据。 若要创建新数据库(如果数据库不存在),请使用创建数据库扩展,如以下代码片段所示。
若要在使用数据库之前查看数据库是否已存在,请使用 list_database_names 方法获取当前数据库的列表。
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and check if database exists"""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <does_database_exist>
# Get list of databases
databases = client.list_database_names()
if not databases:
print("No databases found")
# Does database exist?
DB_NAME = "adventureworks"
if DB_NAME in databases:
print("Database exists: {}".format(DB_NAME))
else:
client[DB_NAME].command({"customAction": "CreateDatabase", "offerThroughput": 400})
print("Created db '{}' with shared throughput.\n".format(DB_NAME))
# </does_database_exist>
# <does_collection_exist>
COLL_NAME = "products"
if COLL_NAME in client[DB_NAME].list_collection_names():
print("Collection exists: {}".format(COLL_NAME))
else:
client[DB_NAME].command({"customAction": "CreateCollection", "collection": COLL_NAME})
print("Created collection '{}'.\n".format(COLL_NAME))
# </does_collection_exist>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
client.close()
if __name__ == "__main__":
main()
"""
# <console_result>
Database exists: adventureworks
# </console_result>
"""
前面的代码片段显示的输出类似于以下示例控制台输出:
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and check if database exists"""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <does_database_exist>
# Get list of databases
databases = client.list_database_names()
if not databases:
print("No databases found")
# Does database exist?
DB_NAME = "adventureworks"
if DB_NAME in databases:
print("Database exists: {}".format(DB_NAME))
else:
client[DB_NAME].command({"customAction": "CreateDatabase", "offerThroughput": 400})
print("Created db '{}' with shared throughput.\n".format(DB_NAME))
# </does_database_exist>
# <does_collection_exist>
COLL_NAME = "products"
if COLL_NAME in client[DB_NAME].list_collection_names():
print("Collection exists: {}".format(COLL_NAME))
else:
client[DB_NAME].command({"customAction": "CreateCollection", "collection": COLL_NAME})
print("Created collection '{}'.\n".format(COLL_NAME))
# </does_collection_exist>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
client.close()
if __name__ == "__main__":
main()
"""
# <console_result>
Database exists: adventureworks
# </console_result>
"""
获取数据库、集合和文档计数的列表
以编程方式管理 MongoDB 服务器时,了解服务器上的数据库和集合以及每个集合中的文档数很有帮助。 有关详细信息,请参阅:
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and get document count."""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <database_object>
# Get list of databases
databases = client.list_database_names()
# Loop through databases
for db in databases:
print("Database: {}".format(db))
# Get list of collections
collections = client[db].list_collection_names()
# Loop through collections
for col in collections:
print("\tCollection: {}".format(col))
# Get document count
doc_count = client[db][col].count_documents({})
print("\tDocument count: {}".format(doc_count))
# </database_object>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
client.close()
if __name__ == "__main__":
main()
"""
# <console_result>
Database: adventureworks
Collection: products_new
Document count: 1
Collection: products
Document count: 3
Database: testdb
Collection: mycoll
Document count: 1
# </console_result>
"""
前面的代码片段显示的输出类似于以下示例控制台输出:
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and get document count."""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <database_object>
# Get list of databases
databases = client.list_database_names()
# Loop through databases
for db in databases:
print("Database: {}".format(db))
# Get list of collections
collections = client[db].list_collection_names()
# Loop through collections
for col in collections:
print("\tCollection: {}".format(col))
# Get document count
doc_count = client[db][col].count_documents({})
print("\tDocument count: {}".format(doc_count))
# </database_object>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
client.close()
if __name__ == "__main__":
main()
"""
# <console_result>
Database: adventureworks
Collection: products_new
Document count: 1
Collection: products
Document count: 3
Database: testdb
Collection: mycoll
Document count: 1
# </console_result>
"""
获取数据库对象实例
如果你访问数据库时数据库不存在,适用于 Python 的 PyMongo 驱动程序会创建数据库。 但是,我们建议你改用 MongoDB 扩展命令来管理存储在 Azure Cosmos DB API for MongoDB 中的数据。 上述模式显示在数据库是否存在?部分。
使用 PyMongo 时,可以在 MongoClient 实例上使用属性样式访问来访问数据库。 有了数据库实例后,可以使用数据库级别操作,如下所示。
collections = client[db].list_collection_names()
有关使用 PyMongo 驱动程序处理数据库的概述,请参阅数据库级别操作。
删除数据库
使用 MongoClient 的 drop_database 方法从服务器中删除数据库。
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and drop database."""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <drop_database>
DB_NAME = input("Enter database name to drop: ")
if DB_NAME in client.list_database_names():
print("Dropping database: {}".format(DB_NAME))
client.drop_database(DB_NAME)
else:
print("Didn't find database: {}".format(DB_NAME))
# </drop_database>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
client.close()
if __name__ == "__main__":
main()
"""
# <console_result>
Enter database name to drop: adventureworks
Dropping database: adventureworks
# </console_result>
"""
前面的代码片段显示的输出类似于以下示例控制台输出:
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------
import os
import sys
import pymongo
from dotenv import load_dotenv
def main():
"""Connect to the API for MongoDB and drop database."""
try:
load_dotenv()
CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
client = pymongo.MongoClient(CONNECTION_STRING)
# <drop_database>
DB_NAME = input("Enter database name to drop: ")
if DB_NAME in client.list_database_names():
print("Dropping database: {}".format(DB_NAME))
client.drop_database(DB_NAME)
else:
print("Didn't find database: {}".format(DB_NAME))
# </drop_database>
try:
client.server_info() # validate connection string
except (
pymongo.errors.OperationFailure,
pymongo.errors.ConnectionFailure,
pymongo.errors.ExecutionTimeout,
) as err:
sys.exit("Can't connect:" + str(err))
except Exception as err:
sys.exit("Error:" + str(err))
client.close()
if __name__ == "__main__":
main()
"""
# <console_result>
Enter database name to drop: adventureworks
Dropping database: adventureworks
# </console_result>
"""