使用 IoT 中心将文件从设备上传到云 (Python)
本文演示 IoT 中心的文件上传功能如何使用 Python 将文件上传到 Azure Blob 存储。
将遥测数据从设备发送到 IoT 中心快速入门和使用 IoT 中心发送云到设备的消息一文介绍了 IoT 中心提供的基本的设备到云和云到设备的消息传送功能。 使用 IoT 中心配置消息路由教程介绍了一种在 Azure Blob 存储中可靠存储设备到云消息的方法。 但是,在某些情况下,无法轻松地将设备发送的数据映射为 IoT 中心接受的相对较小的设备到云消息。 例如:
- 视频
- 包含图像的大型文件
- 以高频率采样的振动数据
- 某种形式的预处理数据。
通常使用 Azure 数据工厂或 Hadoop 堆栈等工具在云中批处理这些文件。 需要从设备上传文件时,仍可以使用 IoT 中心的安全性和可靠性。 本文介绍如何进行此操作。
本文结束时,你将运行 Python 控制台应用 FileUpload.py,该应用使用 Python 设备 SDK 将文件上传到存储。
注意
IoT 中心通过 Azure IoT 设备 SDK 来支持许多设备平台和语言(包括 C、Java、Python 和 JavaScript)。 请参阅 Azure IoT 开发人员中心,了解如何将设备连接到 Azure IoT 中心。
重要
使用 X.509 证书颁发机构 (CA) 身份验证的设备上的文件上传功能为公共预览版,并且必须启用预览模式。 这是在一同使用 X.509 指纹身份验证或 X.509 证书证明与 Azure 设备预配服务的设备上的正式发布版本。 若要了解有关使用 IoT 中心进行 X.509 身份验证的详细信息,请参阅支持的 X.509 证书。
先决条件
有效的 Azure 帐户。 (如果没有帐户,可以创建一个试用帐户,只需几分钟即可完成。)
已注册的设备。 在 Azure 门户中注册一个。
建议使用 Python 版本 3.7 或更高版本。 请确保根据安装程序的要求,使用 32 位或 64 位安装。 在安装过程中出现提示时,请确保将 Python 添加到特定于平台的环境变量中。
端口 8883 在防火墙中须处于打开状态。 本文中的设备示例使用 MQTT 协议,该协议通过端口 8883 进行通信。 在某些公司和教育网络环境中,此端口可能被阻止。 有关解决此问题的更多信息和方法,请参阅连接到 IoT 中心(MQTT)。
将 Azure 存储帐户关联到 IoT 中心
若要从设备上传文件,必须拥有与 IoT 中心关联的 Azure 存储帐户和 Azure Blob 存储容器。 将存储帐户和容器与 IoT 中心关联后,IoT 中心可以在设备请求时提供 SAS URI 的元素。 然后,设备可以使用这些元素来构造 SAS URI,并将其用于向 Azure 存储进行身份验证,以及上传文件到 blob 容器。
将 Azure 存储帐户与 IoT 中心关联:
在“中心设置”下,选择 IoT 中心左侧窗格上的“文件上传”。
在“文件上传”窗格中,选择“Azure 存储容器” 。 对于本文,建议你的存储帐户和 IoT 中心位于同一区域。
如果你已有要使用的存储帐户,请从列表中选择它。
若要创建新的存储帐户,请选择“+存储帐户”。 为该存储帐户命名,并确保其位置设置为 IoT 中心的同一区域,然后选择“确定” 。 将在 IoT 中心的同一资源组中创建新帐户。 部署完成后,从列表中选择“存储帐户”。
选择“存储帐户”后,将打开“容器”窗格。
在“容器”窗格中,选择“blob 容器”。
如果已有要使用的 blob 容器,请从列表中选择它,然后单击“选择”。
然后选择“新建容器”以创建新的 blob 容器。 为新容器命名。 出于本文的目的,可以将其他所有字段保留为默认值。 选择“创建”。 部署完成后,从列表中选择“容器”,然后单击“选择”。
回到“文件上传”窗格,确保文件通知设置为“开启” 。 可将其他所有设置保留默认值。 选择“保存”并等待设置完成,然后进入下一部分。
有关如何创建 Azure 存储帐户的详细说明,请参阅创建存储帐户。 若要了解如何将存储帐户以及 blob 容器与 IoT 中心关联的详细信息,请参阅使用 Azure 门户配置文件上传。
从设备应用上传文件
本部分中操作将会创建可将文件上传到 IoT 中心的设备应用。
在命令提示符处,运行以下命令来安装 azure-iot-device 包。 可以使用此包与 IoT 中心协调文件上传操作。
pip install azure-iot-device
在命令提示符处,运行以下命令来安装 azure.storage.blob 包。 可以使用此包执行文件上传操作。
pip install azure.storage.blob
创建将上传到 blob 存储的测试文件。
使用文本编辑器,在工作文件夹中创建一个 FileUpload.py 文件 。
在 FileUpload.py 文件的开头添加以下
import
语句和变量 。import os from azure.iot.device import IoTHubDeviceClient from azure.core.exceptions import AzureError from azure.storage.blob import BlobClient CONNECTION_STRING = "[Device Connection String]" PATH_TO_FILE = r"[Full path to local file]"
在文件中,将
[Device Connection String]
替换为 IoT 中心设备的连接字符串。 将[Full path to local file]
替换为你创建的测试文件的路径,或是设备上要上传的任何文件的路径。创建一个函数以将文件上传到 blob 存储:
def store_blob(blob_info, file_name): try: sas_url = "https://{}/{}/{}{}".format( blob_info["hostName"], blob_info["containerName"], blob_info["blobName"], blob_info["sasToken"] ) print("\nUploading file: {} to Azure Storage as blob: {} in container {}\n".format(file_name, blob_info["blobName"], blob_info["containerName"])) # Upload the specified file with BlobClient.from_blob_url(sas_url) as blob_client: with open(file_name, "rb") as f: result = blob_client.upload_blob(f, overwrite=True) return (True, result) except FileNotFoundError as ex: # catch file not found and add an HTTP status code to return in notification to IoT Hub ex.status_code = 404 return (False, ex) except AzureError as ex: # catch Azure errors that might result from the upload operation return (False, ex)
此函数分析传递给它的 blob_info 结构,以创建用于初始化 azure.storage.blob.BlobClient 的 URL。 然后,它使用此客户端将文件上传到 Azure Blob 存储。
添加下述用来连接客户端并上传文件的代码:
def run_sample(device_client): # Connect the client device_client.connect() # Get the storage info for the blob blob_name = os.path.basename(PATH_TO_FILE) storage_info = device_client.get_storage_info_for_blob(blob_name) # Upload to blob success, result = store_blob(storage_info, PATH_TO_FILE) if success == True: print("Upload succeeded. Result is: \n") print(result) print() device_client.notify_blob_upload_status( storage_info["correlationId"], True, 200, "OK: {}".format(PATH_TO_FILE) ) else : # If the upload was not successful, the result is the exception object print("Upload failed. Exception is: \n") print(result) print() device_client.notify_blob_upload_status( storage_info["correlationId"], False, result.status_code, str(result) ) def main(): device_client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING) try: print ("IoT Hub file upload sample, press Ctrl-C to exit") run_sample(device_client) except KeyboardInterrupt: print ("IoTHubDeviceClient sample stopped") finally: # Graceful exit device_client.shutdown() if __name__ == "__main__": main()
此代码创建一个 IoTHubDeviceClient,并使用以下 API 来管理通过 IoT 中心进行的文件上传操作:
get_storage_info_for_blob 从 IoT 中心获取你之前创建的链接存储帐户的相关信息。 此信息包括主机名、容器名称、blob 名称和 SAS 令牌。 存储信息传递到 store_blob 函数(在上一步中创建),因此该函数中的 BlobClient 可以通过 Azure 存储进行身份验证。 get_storage_info_for_blob 方法还会返回一个 correlation_id,后者在 notify_blob_upload_status 方法中使用。 IoT 中心使用 correlation_id 来标记你在处理的 blob。
notify_blob_upload_status 向 IoT 中心通知你的 blob 存储操作的状态。 你将向其传递通过 get_storage_info_for_blob 方法获取的 correlation_id。 IoT 中心使用它来通知任何可能正在侦听文件上传任务状态通知的服务。
保存并关闭 FileUpload.py 文件。
运行应用程序
现即可运行应用程序。
后续步骤
在本文中,你已学习了如何使用 IoT 中心的文件上传功能来简化从设备进行的文件上传。 可以继续通过以下文章了解此功能:
使用以下链接详细了解 Azure Blob 存储: