批量导出和导入 Azure 通知中心注册

在某些情况下,我们需要在通知中心创建或修改大量的注册。 其中的某些情况是先进行标记更新,再进行批量计算,或者迁移现有的推送实施方案以使用 Azure 通知中心。

本文介绍如何针对通知中心执行大量操作,或者批量导出所有注册。

注意:批量导入/导出仅适用于“标准”定价层

概要工作流

批处理支持旨在支持涉及到数百万个注册的长时间运行的作业。 为了实现这种规模,批处理支持使用 Azure 存储来存储作业详细信息和输出。 执行批量更新操作时,用户需要在 Blob 容器中创建一个文件,其内容是注册更新操作的列表。 启动该作业时,用户需提供输入 Blob 的 URL,以及输出目录(也在 Blob 容器中)的 URL。 启动该作业后,用户可以通过查询启动作业时提供的 URL 位置来检查状态。 特定的作业只能执行特定类型的操作(创建、更新或删除)。 导出操作以类似的方式执行。

Import

设置

本部分假设存在以下实体:

创建输入文件并将其存储在 Blob 中

输入文件包含以 XML 格式序列化的注册列表,每行包含一个注册。 以下代码示例演示如何使用 Azure SDK 序列化注册并将其上传到 Blob 容器:

private static async Task SerializeToBlobAsync(BlobContainerClient container, RegistrationDescription[] descriptions)
{
     StringBuilder builder = new StringBuilder();
     foreach (var registrationDescription in descriptions)
     {
          builder.AppendLine(registrationDescription.Serialize());
     }

     var inputBlob = container.GetBlobClient(INPUT_FILE_NAME);
     using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
     {
         await inputBlob.UploadAsync(stream);
     }
}

重要

上面的代码在内存中序列化注册,然后将整个流上传到 Blob。 如果上传的文件大小只有几个 MB,请参阅有关如何执行这些步骤的 Azure Blob 指导,例如 块 Blob

创建 URL 标记

上传输入文件后,生成要提供给通知中心的输入文件 URL 和输出目录 URL。 可为输入和输出使用两个不同的 Blob 容器。

static Uri GetOutputDirectoryUrl(BlobContainerClient container)
{
      Console.WriteLine(container.CanGenerateSasUri);
      BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.All, DateTime.UtcNow.AddDays(1));
      return container.GenerateSasUri(builder);
}

static Uri GetInputFileUrl(BlobContainerClient container, string filePath)
{
      Console.WriteLine(container.CanGenerateSasUri);
      BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.Read, DateTime.UtcNow.AddDays(1));
      return container.GenerateSasUri(builder);
}

提交作业

生成两个输入和输出 URL 后,接下来可以启动批处理作业。

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var job = await client.SubmitNotificationHubJobAsync(
     new NotificationHubJob {
             JobType = NotificationHubJobType.ImportCreateRegistrations,
             OutputContainerUri = outputContainerSasUri,
             ImportFileUri = inputFileSasUri
         }
     );

long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    job = await client.GetNotificationHubJobAsync(job.JobId);
    await Task.Delay(1000);
    i--;
}

此示例除了创建输入和输出 URL 以外,还会创建一个 NotificationHubJob 对象,该对象包含以下类型之一的 JobType 对象:

  • ImportCreateRegistrations
  • ImportUpdateRegistrations
  • ImportDeleteRegistrations

完成调用后,通知中心会继续运行该作业,可以通过调用 GetNotificationHubJobAsync 来检查作业状态。

完成该作业后,可以通过查看输出目录中的以下文件来检查结果:

  • /<hub>/<jobid>/Failed.txt
  • /<hub>/<jobid>/Output.txt

这些文件包含成功和失败的批处理操作列表。 文件的格式为 .cvs,其中,每行包含原始输入文件的行号,以及操作的输出(通常是创建或更新的注册说明)。

完整示例代码

以下示例代码将注册导入到通知中心。

using Microsoft.Azure.NotificationHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Sas;
using System.Text;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "namespace"; 
        private static string HUB_NAME = "demohub";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT_CONNECTIONSTRING = "connectionstring";
        private static string CONTAINER_NAME = "containername";

        static async Task Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            // Get a reference to a container named "sample-container" and then create it
            BlobContainerClient container = new BlobContainerClient(STORAGE_ACCOUNT_CONNECTIONSTRING, CONTAINER_NAME);

            await container.CreateIfNotExistsAsync();

            await SerializeToBlobAsync(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);

            BlobContainerClient inputcontainer = new BlobContainerClient(STORAGE_ACCOUNT_CONNECTIONSTRING, STORAGE_ACCOUNT_CONNECTIONSTRING + "/" +         INPUT_FILE_NAME);

            var inputFileSasUri = GetInputFileUrl(inputcontainer, INPUT_FILE_NAME);

            // Import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var job = await client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );

            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                job = await client.GetNotificationHubJobAsync(job.JobId);
                await Task.Delay(1000);
                i--;
            }
        }

        private static async Task SerializeToBlobAsync(BlobContainerClient container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(registrationDescription.Serialize());
            }

            var inputBlob = container.GetBlobClient(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                await inputBlob.UploadAsync(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(BlobContainerClient container)
        {
            Console.WriteLine(container.CanGenerateSasUri);
            BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.All, DateTime.UtcNow.AddDays(1));
            return container.GenerateSasUri(builder);
        }

        static Uri GetInputFileUrl(BlobContainerClient container, string filePath)
        {
            Console.WriteLine(container.CanGenerateSasUri);
            BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.Read, DateTime.UtcNow.AddDays(1));
            return container.GenerateSasUri(builder);

        }
    }
}

Export

导出注册的过程类似于导入,但存在以下差别:

  • 只需提供输出 URL。
  • 创建 ExportRegistrations 类型的 NotificationHubJob。

示例代码片段

下面是在 Java 中导出注册的示例代码片段:

// Submit an export job
NotificationHubJob job = new NotificationHubJob();
job.setJobType(NotificationHubJobType.ExportRegistrations);
job.setOutputContainerUri("container uri with SAS signature");
job = hub.submitNotificationHubJob(job);

// Wait until the job is done
while(true){
    Thread.sleep(1000);
    job = hub.getNotificationHubJob(job.getJobId());
    if(job.getJobStatus() == NotificationHubJobStatus.Completed)
        break;
}

后续步骤

若要详细了解注册,请参阅以下文章: