Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
使用Python为Azure Functions进行开发时,需要了解函数的执行方式以及该性能如何影响函数应用缩放的方式。 设计高性能应用时,需求更为重要。 设计、编写和配置函数应用时要考虑的主要因素是水平缩放和吞吐量性能配置。
水平缩放
默认情况下,Azure Functions会自动监视应用程序的负载,并根据需要为Python创建更多主机实例。 Azure Functions使用不同的触发器类型的内置阈值来决定何时添加实例,例如 QueueTrigger 的消息期限和队列大小。 这些阈值不可由用户配置。 有关详细信息,请参阅 Azure Functions 中的事件驱动缩放。
提高吞吐量性能
默认配置适用于大多数Azure Functions应用程序。 但是,可以通过使用基于工作负荷配置文件的配置来提高应用程序吞吐量的性能。 第一步是了解正在运行的工作负荷的类型。
| 工作负荷类型 | 函数应用特征 | 示例 |
|---|---|---|
| I/O 绑定 | • 应用需要处理许多并发调用。 • 应用处理大量 I/O 事件,例如网络调用和磁盘读/写。 |
• Web API |
| CPU 限制型 | • 应用执行长时间运行的计算,例如图像大小调整。 • 应用执行数据转换。 |
•数据处理 • 机器学习推理 |
由于实际函数工作负载通常是 I/O 和 CPU 绑定的混合,因此应在实际生产负载下分析应用。
性能专用配置
了解函数应用的工作负荷配置文件后,可以使用以下配置来提高函数的吞吐量性能。
异步
由于 Python 是单线程运行时,因此默认情况下,Python的主机实例一次只能处理一个函数调用。 对于处理大量 I/O 事件和/或受 I/O 绑定的应用程序,可以通过异步运行函数来显著提高性能。
若要异步运行函数,请使用 async def 语句,该语句直接使用 asyncio 运行函数:
async def main():
await some_nonblocking_socket_io_op()
下面是使用 aiohttp http 客户端的 HTTP 触发器的函数示例:
import aiohttp
import azure.functions as func
async def main(req: func.HttpRequest) -> func.HttpResponse:
async with aiohttp.ClientSession() as client:
async with client.get("PUT_YOUR_URL_HERE") as response:
return func.HttpResponse(await response.text())
return func.HttpResponse(body='NotFound', status_code=404)
没有关键字的 async 函数会自动在 ThreadPoolExecutor 线程池中运行:
# Runs in a ThreadPoolExecutor threadpool. Number of threads is defined by PYTHON_THREADPOOL_THREAD_COUNT.
# The example is intended to show how default synchronous functions are handled.
def main():
some_blocking_socket_io()
为了实现异步运行函数的全部优势,代码中使用的 I/O作/库也需要实现异步。 在被定义为异步的函数中使用同步 I/O 可能会损害 整体性能。 如果使用的库未实现异步版本,则仍可以通过在应用中 管理事件循环 来异步运行代码中获益。
下面是已实现异步模式的客户端库的几个示例:
- aiohttp - asyncio 的 Http 客户端/服务器
- 流 API - 用于网络连接的高级异步/await 就绪基元
- Janus Queue - Python线程安全且支持asyncio的队列
- pyzmq - ZeroMQ 的Python绑定
了解 Python 工作线程中的异步
在函数签名之前定义 async 时,Python 将该函数标记为协程。 调用协程时,它可以被调度为任务,进入事件循环。 在异步函数中调用 await 时,它会将延续注册到事件循环中,该循环允许事件循环在等待期间处理下一个任务。
在我们的 Python Worker 中,辅助角色与客户的 async 函数共享事件循环,并且能够同时处理多个请求。 我们强烈建议客户使用 aiohttp 和 pyzmq 等与 asyncio 兼容的库。 遵循这些建议可提高函数的吞吐量,与同步实现时这些库相比。
注释
如果函数声明为 async,且其实现内没有任何await,则函数的性能将受到严重影响,因为将阻止事件循环,从而禁止Python辅助角色处理并发请求。
使用多个语言处理进程
默认情况下,每个 Functions 主机实例都有一个语言工作进程。 可以使用应用程序设置增加每个主机的工作进程数(最多 10 FUNCTIONS_WORKER_PROCESS_COUNT 个)。 然后,Azure Functions 尝试在这些辅助角色之间均匀分配同时发出的函数调用。
对于 CPU 绑定应用,应将语言辅助角色数设置为与每个函数应用可用的内核数相同或更高。 若要了解详细信息,请参阅 可用实例 SKU。
I/O 绑定的应用的数量也可能会得益于工作进程数增加到超过可用的内核数。 请记住,由于所需上下文切换的数量增加,设置工作线程数量过高可能会影响整体性能。
FUNCTIONS_WORKER_PROCESS_COUNT适用于当扩大应用程序的规模以满足需求时,Azure Functions 创建的每个主机。
在语言工作进程中设置最大工作线程
如异步部分中所述,Python 语言工作者以不同的方式处理函数和协程。 协程在运行语言工作器的同一事件循环中运行。 另一方面,函数调用在 ThreadPoolExecutor 中运行,该线程由语言辅助角色作为线程维护。
可以使用 PYTHON_THREADPOOL_THREAD_COUNT 应用程序设置来设置允许运行同步函数的最大线程数量。 此值设置 ThreadPoolExecutor 对象的 max_worker 参数,该参数允许Python最多使用 max_worker 线程池异步执行调用。
PYTHON_THREADPOOL_THREAD_COUNT适用于 Functions 主机进程创建的每个工作线程,Python 负责决定何时创建新线程或重用现有的空闲线程。 对于较旧的Python版本(即 3.8、3.7 和 3.6),max_worker值设置为 1。 对于 Python 版本 3.9,max_worker 设置为 None。
对于 CPU 密集型应用,您应该把设置值保持在较低水平,从 1 开始设置,并在尝试不同的工作负载时逐步增加。 此建议是减少在上下文切换上花费的时间,并允许 CPU 绑定的任务完成。
对于 I/O 绑定的应用,你应通过增加在每次调用上工作的线程数来获得显著的提升。 建议从Python默认(核心数)+ 4 开始,然后根据看到的吞吐量值进行调整。
对于混合工作负荷应用,应同时平衡 FUNCTIONS_WORKER_PROCESS_COUNT 和 PYTHON_THREADPOOL_THREAD_COUNT 配置,以最大程度地提高吞吐量。 若要了解函数应用花费的时间最多,建议分析这些应用并根据其行为设置值。 若要了解这些应用程序设置,请参阅 “使用多个工作进程”。
注释
尽管这些建议同时适用于 HTTP 和非 HTTP 触发函数,但可能需要调整非 HTTP 触发函数的其他触发器特定配置,以便从函数应用获取预期性能。 有关此的更多信息,请参阅此 可靠的 Azure 函数最佳实践。
管理事件循环
应使用与 asyncio 兼容的第三方库。 如果第三方库都不符合你的需求,则还可以管理Azure Functions中的事件循环。 通过管理事件循环,可以更灵活地进行计算资源管理,还可以将同步 I/O 库包装到协同例程中。
有许多有用的官方 Python 文档,通过使用内置的 asyncio 库来讨论 Coroutines 和 Tasks 以及 Event Loop。
以以下 请求 库为例,此代码片段使用 asyncio 库将方法 requests.get() 包装成一个协同例程,同时运行多个 Web 请求到 SAMPLE_URL。
import asyncio
import json
import logging
import azure.functions as func
from time import time
from requests import get, Response
async def invoke_get_request(eventloop: asyncio.AbstractEventLoop) -> Response:
# Wrap requests.get function into a coroutine
single_result = await eventloop.run_in_executor(
None, # using the default executor
get, # each task call invoke_get_request
'SAMPLE_URL' # the url to be passed into the requests.get function
)
return single_result
async def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
eventloop = asyncio.get_event_loop()
# Create 10 tasks for requests.get synchronous call
tasks = [
asyncio.create_task(
invoke_get_request(eventloop)
) for _ in range(10)
]
done_tasks, _ = await asyncio.wait(tasks)
status_codes = [d.result().status_code for d in done_tasks]
return func.HttpResponse(body=json.dumps(status_codes),
mimetype='application/json')
纵向扩展
通过升级到具有更高规格的高级计划,您可以获取更多处理单元,尤其是在 CPU 绑定操作中。 使用更高的处理单元,可以根据可用的内核数调整工作进程计数,并实现更高的并行度。
后续步骤
有关Azure Functions Python开发的详细信息,请参阅以下资源: