批量导入和导出 IoT 中心设备标识

每个 IoT 中心都有一个标识注册表,可用于在服务中创建设备资源。 标识注册表还可用于控制对面向设备的终结点的访问。 本文介绍如何使用适用于 .NET 的 Azure IoT SDK 附带的 ImportExportDeviceSample 示例,在标识注册表中批量导入和导出设备标识。 如需详细了解在将 IoT 中心迁移到其他区域时如何使用此功能,请参阅如何使用 Azure 资源管理器模板手动迁移 Azure IoT 中心

注意

IoT 中心最近在数量有限的区域中添加了虚拟网络支持。 此功能保护了导入和导出操作的安全,无需传递密钥即可进行身份验证。 若要详细了解虚拟网络支持以及用于实现该支持的 API 调用,请参阅 IoT 中心对虚拟网络的支持

导入和导出操作在作业的上下文中发生,让你能够对 IoT 中心执行批量服务操作。

SDK 中的 RegistryManager 类包括使用“作业”框架的 ExportDevicesAsync 和 ImportDevicesAsync 方法。 这些方法可以导出、导入和同步整个 IoT 中心标识注册表。

本文讨论如何使用 RegistryManager 类和作业系统执行设备到 IoT 中心的标识注册表的批量导入,以及从 IoT 中心的标识注册表到设备的批量导出。 还可以使用 Azure IoT 中心设备预配服务实现零接触即时预配到一个或多个 IoT 中心。 若要了解详细信息,请参阅预配服务文档

注意

本文中的一些代码片段包含在适用于 .NET 的 Azure IoT SDK 随附的 ImportExportDevicesSample 服务示例中。 该示例位于 SDK 的 /iothub/service/samples/how to guides/ImportExportDevicesSample 文件夹中,在指定时,代码片段包含在该 SDK 示例的 ImportExportDevicesSample.cs 文件中。 有关适用于 .NET 的 Azure IoT SDK 中包含的 ImportExportDevicesSample 示例和其他服务示例的详细信息,请参阅适用于 C# 的 Azure IoT 中心服务示例

什么是作业?

当操作出现以下情况时,标识注册表操作会使用作业系统:

  • 相较标准运行时操作,其执行时间可能很长。

  • 向用户返回大量数据。

操作将以异步方式为该 IoT 中心创建作业,而非在操作结果处进行单一的 API 调用等待或阻塞。 然后,操作立即返回 JobProperties 对象。

以下 C# 代码段演示如何创建导出作业:

// Call an export job on the IoT hub to retrieve all devices
JobProperties exportJob = await 
  registryManager.ExportDevicesAsync(containerSasUri, false);

注意

若要在 C# 代码中使用 RegistryManager 类,请将 Microsoft.Azure.Devices NuGet 包添加到项目。 RegistryManager 类位于 Microsoft.Azure.Devices 命名空间。

可使用 RegistryManager 类,查询使用返回的 JobProperties 元数据的作业的状态。 若要创建 RegistryManager 类的实例,请使用 CreateFromConnectionString 方法 。

RegistryManager registryManager =
  RegistryManager.CreateFromConnectionString("{your IoT Hub connection string}");

若要查找 IoT 中心的连接字符串,请在 Azure 门户中执行以下操作:

  1. 导航到 IoT 中心。

  2. 选择“共享访问策略”。

  3. 选择一个策略(考虑到所需的权限)。

  4. 复制该策略的连接字符串。

以下 C# 代码片段(来自 SDK 示例中的 WaitForJobAsync 方法)演示如何每五秒轮询一次以查看作业是否已完成执行:

// Wait until job is finished
while (true)
{
    job = await registryManager.GetJobAsync(job.JobId);
    if (job.Status == JobStatus.Completed
        || job.Status == JobStatus.Failed
        || job.Status == JobStatus.Cancelled)
    {
        // Job has finished executing
        break;
    }
    Console.WriteLine($"\tJob status is {job.Status}...");

    await Task.Delay(TimeSpan.FromSeconds(5));
}

注意

如果你的存储帐户具有限制 IoT 中心连接的防火墙配置,请考虑使用 Microsoft 受信任第一方例外情况(在选定区域的提供托管服务标识的 IoT 中心中适用)。

设备导入/导出作业限制

所有 IoT 中心层级一次只允许一个活动的设备导入或导出作业。 IoT 中心也限制作业操作的速率。 若要了解详细信息,请参阅 IoT 中心配额和限制

导出设备

使用 ExportDevicesAsync 方法,将整个 IoT 中心标识注册表导出到使用共享访问签名 (SAS) 的 Azure 存储 Blob 容器。 使用此方法可在所控制的 Blob 容器中创建可靠的设备信息备份。

ExportDevicesAsync 方法需要两个参数:

  • 包含 Blob 容器 URI 的 字符串 。 此 URI 必须包含可授予容器写入权限的 SAS 令牌。 作业在此容器中创建用于存储序列化导出设备数据的块 Blob。 SAS 令牌必须包含这些权限:

    SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.Read 
       | SharedAccessBlobPermissions.Delete
    
  • 指示你是否要在导出数据中排除身份验证密钥的 布尔值。 如果为 false,则身份验证密钥包含在导出输出中。 否则,密钥导出为 null

下面的 C# 代码段演示了如何启动在导出数据中包含设备身份验证密钥的导出作业,并对完成情况进行轮询:

// Call an export job on the IoT Hub to retrieve all devices
JobProperties exportJob = 
  await registryManager.ExportDevicesAsync(containerSasUri, false);

// Wait until job is finished
while(true)
{
    exportJob = await registryManager.GetJobAsync(exportJob.JobId);
    if (exportJob.Status == JobStatus.Completed || 
        exportJob.Status == JobStatus.Failed ||
        exportJob.Status == JobStatus.Cancelled)
    {
    // Job has finished executing
    break;
    }

    await Task.Delay(TimeSpan.FromSeconds(5));
}

可以在 SDK 示例中的 ExportDevicesAsync 方法中找到类似的代码。 该作业将其输出以名为 devices.txt 的块 Blob 的形式存储在提供的 Blob 容器中。 输出数据包含 JSON 序列化设备数据,每行代表一个设备。

以下示例显示输出数据:

{"id":"Device1","eTag":"MA==","status":"enabled","authentication":{"symmetricKey":{"primaryKey":"abc=","secondaryKey":"def="}}}
{"id":"Device2","eTag":"MA==","status":"enabled","authentication":{"symmetricKey":{"primaryKey":"abc=","secondaryKey":"def="}}}
{"id":"Device3","eTag":"MA==","status":"disabled","authentication":{"symmetricKey":{"primaryKey":"abc=","secondaryKey":"def="}}}
{"id":"Device4","eTag":"MA==","status":"disabled","authentication":{"symmetricKey":{"primaryKey":"abc=","secondaryKey":"def="}}}
{"id":"Device5","eTag":"MA==","status":"enabled","authentication":{"symmetricKey":{"primaryKey":"abc=","secondaryKey":"def="}}}

如果设备具有孪生数据,则孪生数据也将随设备数据一起导出。 以下示例显示了此格式。 从“twinETag”行开始直至结尾的所有数据都是孪生数据。

{
   "id":"export-6d84f075-0",
   "eTag":"MQ==",
   "status":"enabled",
   "authentication":null,
   "twinETag":"AAAAAAAAAAI=",
   "tags":{
      "Location":"LivingRoom"
   },
   "properties":{
      "desired":{
         "Thermostat":{
            "Temperature":75.1,
            "Unit":"F"
         },
      },
      "reported":{}
   }
}

如果需要在代码中访问此数据,可以使用 ExportImportDevice 类反序列化此数据。 以下 C# 代码片段(来自 SDK 示例中的 ReadFromBlobAsync 方法)演示如何读取以前从 ExportImportDevice 导出到 BlobClient 实例的设备信息:

private static async Task<List<string>> ReadFromBlobAsync(BlobClient blobClient)
{
    // Read the blob file of devices, import each row into a list.
    var contents = new List<string>();

    using Stream blobStream = await blobClient.OpenReadAsync();
    using var streamReader = new StreamReader(blobStream, Encoding.UTF8);
    while (streamReader.Peek() != -1)
    {
        string line = await streamReader.ReadLineAsync();
        contents.Add(line);
    }

    return contents;
}

导入设备

通过 RegistryManager 类中的 ImportDevicesAsync 方法,可以在 IoT 中心标识注册表中执行批量导入和同步操作。 如同 ExportDevicesAsync 方法,ImportDevicesAsync 方法也使用作业框架。

请小心使用 ImportDevicesAsync 方法,因为除了在标识注册表中预配新设备以外,它还可以更新和删除现有设备。

警告

导入操作不可撤消。 请始终先使用 ExportDevicesAsync 方法将现有数据备份到其他 Blob 容器,再对标识注册表进行批量更改。

ImportDevicesAsync 方法采用两个参数:

  • 一个字符串,其中包含作为作业的输入使用的 Azure 存储 Blob 容器的 URI。 此 URI 必须包含可授予容器读取权限的 SAS 令牌。 此容器必须包含名为 devices.txt 的 Blob,其中包含要导入标识注册表的序列化设备数据。 导入数据包含的设备信息必须采用 ExportImportDevice 作业在创建 devices.txt Blob 时使用的同一种 JSON 格式。 SAS 令牌必须包含这些权限:

    SharedAccessBlobPermissions.Read
    
  • 一个字符串,其中包含作为作业的输出使用的 Azure 存储 Blob 容器的 URI。 该作业会在此容器中创建块 Blob,以存储来自已完成的导入作业的任何错误信息。 SAS 令牌必须包含这些权限:

    SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.Read 
       | SharedAccessBlobPermissions.Delete
    

注意

这两个参数可以指向同一 Blob 容器。 单独的参数只会让你更好地控制数据,因为输出容器需要其他权限。

以下 C# 代码片段演示如何启动导入作业:

JobProperties importJob = 
   await registryManager.ImportDevicesAsync(containerSasUri, containerSasUri);

还可以使用此方法导入设备孪生的数据。 数据输入的格式与 ExportDevicesAsync 部分中显示的格式相同。 这样,可以重新导入已导出的数据。

导入行为

可以使用 ImportDevicesAsync 方法在标识注册表中执行以下批量操作:

  • 批量注册新设备
  • 批量删除现有设备
  • 批量更改状态(启用或禁用设备)
  • 批量分配新设备身份验证密钥
  • 批量自动重新生成设备身份验证密钥
  • 批量更新孪生数据

可以在单个 ImportDevicesAsync 调用中执行上述操作的任意组合。 例如,可以同时注册新设备并删除或更新现有设备。 与 ExportDevicesAsync 方法一起使用时,可以将一个 IoT 中心内的所有设备全部迁移到另一个 IoT 中心。

可以在每个设备的导入序列化数据中使用可选 importMode 属性来控制每个设备的导入过程。 importMode 属性具有以下选项:

  • 创建
  • CreateOrUpdate(默认)
  • CreateOrUpdateIfMatchETag
  • 删除
  • DeleteIfMatchETag
  • 更新
  • UpdateIfMatchETag
  • UpdateTwin
  • UpdateTwinIfMatchETag

有关每个导入模式选项的详细信息,请参阅 ImportMode

排查导入作业问题

使用导入作业创建设备时,当设备接近 Azure IoT 中心的设备计数限制时,配额可能会失败。 即使总设备计数仍低于配额限制,也会发生这种失败的情况。 返回 IotHubQuotaExceededed (403002) 错误,并显示以下错误消息:“IotHub 上的设备总数超过了分配的配额。”

如果收到此错误,可以使用以下查询返回 Azure IoT 中心注册的设备总数:

SELECT COUNT() as totalNumberOfDevices FROM devices

有关可注册到 IoT 中心的设备总数的信息,请参阅 Azure IoT 中心限制

如果仍有可用配额,则可以检查作业输出 blob 中出现 IotHubQuotaExceeded (403002) 错误的设备。 然后,可以尝试将这些设备单独添加到 Azure IoT 中心。 例如,可以使用 AddDeviceAsyncAddDeviceWithTwinAsync 方法。 请勿尝试使用其他作业添加设备,因为可能会遇到相同的错误。

导入设备示例 – 批量预配设备

以下 C# 代码片段(来自 SDK 示例中的 GenerateDevicesAsync 方法)演示了如何生成多个设备标识,这些标识:

  • 包括身份验证密钥。
  • 将该设备信息写入块 blob。
  • 将设备导入标识注册表。
private async Task GenerateDevicesAsync(RegistryManager registryManager, int numToAdd)
{
    var stopwatch = Stopwatch.StartNew();

    Console.WriteLine($"Creating {numToAdd} devices for the source IoT hub.");
    int interimProgressCount = 0;
    int displayProgressCount = 1000;
    int totalProgressCount = 0;

    // generate reference for list of new devices we're going to add, will write list to this blob
    BlobClient generateDevicesBlob = _blobContainerClient.GetBlobClient(_generateDevicesBlobName);

    // define serializedDevices as a generic list<string>
    var serializedDevices = new List<string>(numToAdd);

    for (int i = 1; i <= numToAdd; i++)
    {
        // Create device name with this format: Hub_00000000 + a new guid.
        // This should be large enough to display the largest number (1 million).
        string deviceName = $"Hub_{i:D8}_{Guid.NewGuid()}";
        Debug.Print($"Adding device '{deviceName}'");

        // Create a new ExportImportDevice.
        var deviceToAdd = new ExportImportDevice
        {
            Id = deviceName,
            Status = DeviceStatus.Enabled,
            Authentication = new AuthenticationMechanism
            {
                SymmetricKey = new SymmetricKey
                {
                    PrimaryKey = GenerateKey(32),
                    SecondaryKey = GenerateKey(32),
                }
            },
            // This indicates that the entry should be added as a new device.
            ImportMode = ImportMode.Create,
        };

        // Add device to the list as a serialized object.
        serializedDevices.Add(JsonConvert.SerializeObject(deviceToAdd));

        // Not real progress as you write the new devices, but will at least show *some* progress.
        interimProgressCount++;
        totalProgressCount++;
        if (interimProgressCount >= displayProgressCount)
        {
            Console.WriteLine($"Added {totalProgressCount}/{numToAdd} devices.");
            interimProgressCount = 0;
        }
    }

    // Now have a list of devices to be added, each one has been serialized.
    // Write the list to the blob.
    var sb = new StringBuilder();
    serializedDevices.ForEach(serializedDevice => sb.AppendLine(serializedDevice));

    // Write list of serialized objects to the blob.
    using Stream stream = await generateDevicesBlob.OpenWriteAsync(overwrite: true);
    byte[] bytes = Encoding.UTF8.GetBytes(sb.ToString());
    for (int i = 0; i < bytes.Length; i += BlobWriteBytes)
    {
        int length = Math.Min(bytes.Length - i, BlobWriteBytes);
        await stream.WriteAsync(bytes.AsMemory(i, length));
    }
    await stream.FlushAsync();

    Console.WriteLine("Running a registry manager job to add the devices.");

    // Should now have a file with all the new devices in it as serialized objects in blob storage.
    // generatedListBlob has the list of devices to be added as serialized objects.
    // Call import using the blob to add the new devices.
    // Log information related to the job is written to the same container.
    // This normally takes 1 minute per 100 devices (according to the docs).

    // First, initiate an import job.
    // This reads in the rows from the text file and writes them to IoT Devices.
    // If you want to add devices from a file, you can create a file and use this to import it.
    //   They have to be in the exact right format.
    try
    {
        // The first URI is the container to import from; the file defaults to devices.txt, but may be specified.
        // The second URI points to the container to write errors to as a blob.
        // This lets you import the devices from any file name. Since we wrote the new
        // devices to [devicesToAdd], need to read the list from there as well.
        var importGeneratedDevicesJob = JobProperties.CreateForImportJob(
            _containerUri,
            _containerUri,
            _generateDevicesBlobName);
        importGeneratedDevicesJob = await registryManager.ImportDevicesAsync(importGeneratedDevicesJob);
        await WaitForJobAsync(registryManager, importGeneratedDevicesJob);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Adding devices failed due to {ex.Message}");
    }

    stopwatch.Stop();
    Console.WriteLine($"GenerateDevices, time elapsed = {stopwatch.Elapsed}.");
}

导入设备示例 – 批量删除

以下 C# 代码片段(SDK 示例中的 DeleteFromHubAsync 方法)演示如何从 IoT 中心删除所有设备:

private async Task DeleteFromHubAsync(RegistryManager registryManager, bool includeConfigurations)
{
    var stopwatch = Stopwatch.StartNew();

    Console.WriteLine("Deleting all devices from an IoT hub.");

    Console.WriteLine("Exporting a list of devices from IoT hub to blob storage.");

    // Read from storage, which contains serialized objects.
    // Write each line to the serializedDevices list.
    BlobClient devicesBlobClient = _blobContainerClient.GetBlobClient(_destHubDevicesImportBlobName);

    Console.WriteLine("Reading the list of devices in from blob storage.");
    List<string> serializedDevices = await ReadFromBlobAsync(devicesBlobClient);

    // Step 1: Update each device's ImportMode to be Delete
    Console.WriteLine("Updating ImportMode to be 'Delete' for each device and writing back to the blob.");
    var sb = new StringBuilder();
    serializedDevices.ForEach(serializedEntity =>
    {
        // Deserialize back to an ExportImportDevice and change import mode.
        ExportImportDevice device = JsonConvert.DeserializeObject<ExportImportDevice>(serializedEntity);
        device.ImportMode = ImportMode.Delete;

        // Reserialize the object now that we've updated the property.
        sb.AppendLine(JsonConvert.SerializeObject(device));
    });

    // Step 2: Write the list in memory to the blob.
    BlobClient deleteDevicesBlobClient = _blobContainerClient.GetBlobClient(_hubDevicesCleanupBlobName);
    await WriteToBlobAsync(deleteDevicesBlobClient, sb.ToString());

    // Step 3: Call import using the same blob to delete all devices.
    Console.WriteLine("Running a registry manager job to delete the devices from the IoT hub.");
    var importJob = JobProperties.CreateForImportJob(
        _containerUri,
        _containerUri,
        _hubDevicesCleanupBlobName);
    importJob = await registryManager.ImportDevicesAsync(importJob);
    await WaitForJobAsync(registryManager, importJob);

    // Step 4: delete configurations
    if (includeConfigurations)
    {
        BlobClient configsBlobClient = _blobContainerClient.GetBlobClient(_srcHubConfigsExportBlobName);
        List<string> serializedConfigs = await ReadFromBlobAsync(configsBlobClient);
        foreach (string serializedConfig in serializedConfigs)
        {
            try
            {
                Configuration config = JsonConvert.DeserializeObject<Configuration>(serializedConfig);
                await registryManager.RemoveConfigurationAsync(config.Id);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Failed to deserialize or remove a config.\n\t{serializedConfig}\n\n{ex.Message}");
            }
        }
    }

    stopwatch.Stop();
    Console.WriteLine($"Deleted IoT hub devices and configs: time elapsed = {stopwatch.Elapsed}");
}

获取容器 SAS URI

下面的代码示例演示如何使用 Blob 容器的读取、写入和删除权限生成 SAS URI

static string GetContainerSasUri(CloudBlobContainer container)
{
  // Set the expiry time and permissions for the container.
  // In this case no start time is specified, so the
  // shared access signature becomes valid immediately.
  var sasConstraints = new SharedAccessBlobPolicy();
  sasConstraints.SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24);
  sasConstraints.Permissions = 
    SharedAccessBlobPermissions.Write | 
    SharedAccessBlobPermissions.Read | 
    SharedAccessBlobPermissions.Delete;

  // Generate the shared access signature on the container,
  // setting the constraints directly on the signature.
  string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

  // Return the URI string for the container,
  // including the SAS token.
  return container.Uri + sasContainerToken;
}

后续步骤

在本文中,已学习如何对 IoT 中心内的标识注册表执行批量操作。 其中许多操作(包括如何将设备从一个中心移到另一个中心)都会在如何使用 Azure 资源管理器模板手动迁移 Azure IoT 中心的“管理已注册到 IoT 中心的设备”部分中用到。