使用 Python 管理 MongoDB 数据库

适用对象: MongoDB

Azure Cosmos DB 中的 MongoDB 服务器可从适用于 MongoDB 的常见 Python 包获取,例如:

  • 适用于同步 Python 应用程序且在本文中使用的 PyMongo
  • 适用于异步 Python 应用程序的 Motor

注意

示例代码片段在 GitHub 上作为 Python 项目提供。

为数据库命名

在 Azure Cosmos DB 中,数据库类似于命名空间。 创建数据库时,数据库名称会形成用于访问数据库资源和任何子资源的 URI 段。

下面是命名数据库时的一些快速规则:

  • 数据库名称长度必须介于 3 到 63 个字符之间
  • 数据库名称只能包含小写字母、数字或短划线 (-) 字符。
  • 数据库名称必须以小写字母或数字开头。

创建后,数据库的 URI 采用以下格式:

https://<cosmos-account-name>.documents.azure.cn/dbs/<database-name>

获取数据库实例

数据库保存集合及其文档。 若要访问数据库,请使用 MongoClient 的属性样式访问或字典样式访问。 有关详细信息,请参阅获取数据库

下面的代码片段假设你已创建客户端连接,并且在这些代码片段之后关闭了客户端连接

获取服务器信息

使用 MongoClient 类的 server_info 方法访问服务器信息。 无需指定数据库名称即可获取此信息。 返回的信息特定于 MongoDB,不代表 Azure Cosmos DB 平台本身。

还可以使用 MongoClient.list_database_names 方法列出数据库,并使用 MongoClient.db.command 方法向数据库发出 MongoDB 命令

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and show admin commands"""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <server_info>
        # Get server information
        for k, v in client.server_info().items():
            print("Key: {} , Value: {}".format(k, v))

        # Get server status of admin database
        print("Server status {}".format(client.admin.command("serverStatus")))

        # List databases
        databases = client.list_database_names()
        print("Databases: {}".format(databases))
        # </server_info>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    # <client_disconnect>
    client.close()
    # </client_disconnect>


if __name__ == "__main__":
    main()

"""
# <console_result>
Key: version , Value: 3.6.0
Key: versionArray , Value: [3, 6, 0, 0]
Key: bits , Value: 64
Key: maxBsonObjectSize , Value: 16777216
Key: ok , Value: 1.0
Server status {'ok': 1.0}
Databases: ['adventureworks']
# </console_result>
"""

前面的代码片段显示的输出类似于以下示例控制台输出:

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and show admin commands"""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <server_info>
        # Get server information
        for k, v in client.server_info().items():
            print("Key: {} , Value: {}".format(k, v))

        # Get server status of admin database
        print("Server status {}".format(client.admin.command("serverStatus")))

        # List databases
        databases = client.list_database_names()
        print("Databases: {}".format(databases))
        # </server_info>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    # <client_disconnect>
    client.close()
    # </client_disconnect>


if __name__ == "__main__":
    main()

"""
# <console_result>
Key: version , Value: 3.6.0
Key: versionArray , Value: [3, 6, 0, 0]
Key: bits , Value: 64
Key: maxBsonObjectSize , Value: 16777216
Key: ok , Value: 1.0
Server status {'ok': 1.0}
Databases: ['adventureworks']
# </console_result>
"""

数据库是否存在?

如果你访问数据库时数据库不存在,适用于 Python 的 PyMongo 驱动程序会创建数据库。 但是,我们建议你改用 MongoDB 扩展命令来管理存储在 Azure Cosmos DB API for MongoDB 中的数据。 若要创建新数据库(如果数据库不存在),请使用创建数据库扩展,如以下代码片段所示。

若要在使用数据库之前查看数据库是否已存在,请使用 list_database_names 方法获取当前数据库的列表。

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and check if database exists"""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <does_database_exist> 
        # Get list of databases
        databases = client.list_database_names()
        if not databases:
            print("No databases found")

        # Does database exist?
        DB_NAME = "adventureworks"
        if DB_NAME in databases:
            print("Database exists: {}".format(DB_NAME))
        else:
            client[DB_NAME].command({"customAction": "CreateDatabase", "offerThroughput": 400})
            print("Created db '{}' with shared throughput.\n".format(DB_NAME))
        # </does_database_exist>

        # <does_collection_exist>
        COLL_NAME = "products"
        if COLL_NAME in client[DB_NAME].list_collection_names():
            print("Collection exists: {}".format(COLL_NAME))
        else:
            client[DB_NAME].command({"customAction": "CreateCollection", "collection": COLL_NAME})
            print("Created collection '{}'.\n".format(COLL_NAME))
        # </does_collection_exist>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    client.close()

if __name__ == "__main__":
    main()

"""
# <console_result>
Database exists: adventureworks
# </console_result>
"""

前面的代码片段显示的输出类似于以下示例控制台输出:

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and check if database exists"""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <does_database_exist> 
        # Get list of databases
        databases = client.list_database_names()
        if not databases:
            print("No databases found")

        # Does database exist?
        DB_NAME = "adventureworks"
        if DB_NAME in databases:
            print("Database exists: {}".format(DB_NAME))
        else:
            client[DB_NAME].command({"customAction": "CreateDatabase", "offerThroughput": 400})
            print("Created db '{}' with shared throughput.\n".format(DB_NAME))
        # </does_database_exist>

        # <does_collection_exist>
        COLL_NAME = "products"
        if COLL_NAME in client[DB_NAME].list_collection_names():
            print("Collection exists: {}".format(COLL_NAME))
        else:
            client[DB_NAME].command({"customAction": "CreateCollection", "collection": COLL_NAME})
            print("Created collection '{}'.\n".format(COLL_NAME))
        # </does_collection_exist>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    client.close()

if __name__ == "__main__":
    main()

"""
# <console_result>
Database exists: adventureworks
# </console_result>
"""

获取数据库、集合和文档计数的列表

以编程方式管理 MongoDB 服务器时,了解服务器上的数据库和集合以及每个集合中的文档数很有帮助。 有关详细信息,请参阅:

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and get document count."""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <database_object>
        # Get list of databases
        databases = client.list_database_names()

        # Loop through databases
        for db in databases:
            print("Database: {}".format(db))

            # Get list of collections
            collections = client[db].list_collection_names()

            # Loop through collections
            for col in collections:
                print("\tCollection: {}".format(col))

                # Get document count
                doc_count = client[db][col].count_documents({})
                print("\tDocument count: {}".format(doc_count))
        # </database_object>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    client.close()

if __name__ == "__main__":
    main()

"""
# <console_result>
Database: adventureworks
        Collection: products_new
        Document count: 1
        Collection: products
        Document count: 3
Database: testdb
        Collection: mycoll
        Document count: 1
# </console_result>
"""

前面的代码片段显示的输出类似于以下示例控制台输出:

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and get document count."""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <database_object>
        # Get list of databases
        databases = client.list_database_names()

        # Loop through databases
        for db in databases:
            print("Database: {}".format(db))

            # Get list of collections
            collections = client[db].list_collection_names()

            # Loop through collections
            for col in collections:
                print("\tCollection: {}".format(col))

                # Get document count
                doc_count = client[db][col].count_documents({})
                print("\tDocument count: {}".format(doc_count))
        # </database_object>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    client.close()

if __name__ == "__main__":
    main()

"""
# <console_result>
Database: adventureworks
        Collection: products_new
        Document count: 1
        Collection: products
        Document count: 3
Database: testdb
        Collection: mycoll
        Document count: 1
# </console_result>
"""

获取数据库对象实例

如果你访问数据库时数据库不存在,适用于 Python 的 PyMongo 驱动程序会创建数据库。 但是,我们建议你改用 MongoDB 扩展命令来管理存储在 Azure Cosmos DB API for MongoDB 中的数据。 上述模式显示在数据库是否存在?部分。

使用 PyMongo 时,可以在 MongoClient 实例上使用属性样式访问来访问数据库。 有了数据库实例后,可以使用数据库级别操作,如下所示。

collections = client[db].list_collection_names()

有关使用 PyMongo 驱动程序处理数据库的概述,请参阅数据库级别操作

删除数据库

使用 MongoClient 的 drop_database 方法从服务器中删除数据库。

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and drop database."""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <drop_database>
        DB_NAME = input("Enter database name to drop: ")
        if DB_NAME in client.list_database_names():
            print("Dropping database: {}".format(DB_NAME))
            client.drop_database(DB_NAME)
        else:
            print("Didn't find database: {}".format(DB_NAME))
        # </drop_database>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    client.close()


if __name__ == "__main__":
    main()

"""
# <console_result>
Enter database name to drop: adventureworks
Dropping database: adventureworks
# </console_result>
"""

前面的代码片段显示的输出类似于以下示例控制台输出:

# ----------------------------------------------------------------------------------------------------------
#  Prerequisites:
#
# 1. An Azure Cosmos DB API for MongoDB Account.
# 2. PyMongo installed.
# 3. python-dotenv installed (to load environment variables from a .env file).
# ----------------------------------------------------------------------------------------------------------
# Code samples are used in documentation, do not change unless synchronizing with documentation.
# ----------------------------------------------------------------------------------------------------------

import os
import sys

import pymongo
from dotenv import load_dotenv


def main():
    """Connect to the API for MongoDB and drop database."""

    try:
        load_dotenv()
        CONNECTION_STRING = os.environ.get("COSMOS_CONNECTION_STRING")
        client = pymongo.MongoClient(CONNECTION_STRING)

        # <drop_database>
        DB_NAME = input("Enter database name to drop: ")
        if DB_NAME in client.list_database_names():
            print("Dropping database: {}".format(DB_NAME))
            client.drop_database(DB_NAME)
        else:
            print("Didn't find database: {}".format(DB_NAME))
        # </drop_database>

        try:
            client.server_info()  # validate connection string
        except (
            pymongo.errors.OperationFailure,
            pymongo.errors.ConnectionFailure,
            pymongo.errors.ExecutionTimeout,
        ) as err:
            sys.exit("Can't connect:" + str(err))
    except Exception as err:
        sys.exit("Error:" + str(err))

    client.close()


if __name__ == "__main__":
    main()

"""
# <console_result>
Enter database name to drop: adventureworks
Dropping database: adventureworks
# </console_result>
"""

另请参阅