批量导出和导入 Azure 通知中心注册
在某些情况下,我们需要在通知中心创建或修改大量的注册。 其中的某些情况是先进行标记更新,再进行批量计算,或者迁移现有的推送实施方案以使用 Azure 通知中心。
本文介绍如何针对通知中心执行大量操作,或者批量导出所有注册。
注意:批量导入/导出仅适用于“标准”定价层
概要工作流
批处理支持旨在支持涉及到数百万个注册的长时间运行的作业。 为了实现这种规模,批处理支持使用 Azure 存储来存储作业详细信息和输出。 执行批量更新操作时,用户需要在 Blob 容器中创建一个文件,其内容是注册更新操作的列表。 启动该作业时,用户需提供输入 Blob 的 URL,以及输出目录(也在 Blob 容器中)的 URL。 启动该作业后,用户可以通过查询启动作业时提供的 URL 位置来检查状态。 特定的作业只能执行特定类型的操作(创建、更新或删除)。 导出操作以类似的方式执行。
Import
设置
本部分假设存在以下实体:
- 一个预配的通知中心。
- 一个 Azure 存储 Blob 容器。
- 对 Azure 存储 NuGet 包和通知中心 NuGet 包的引用。
创建输入文件并将其存储在 Blob 中
输入文件包含以 XML 格式序列化的注册列表,每行包含一个注册。 以下代码示例演示如何使用 Azure SDK 序列化注册并将其上传到 Blob 容器:
private static async Task SerializeToBlobAsync(BlobContainerClient container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(registrationDescription.Serialize());
}
var inputBlob = container.GetBlobClient(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
await inputBlob.UploadAsync(stream);
}
}
重要
上面的代码在内存中序列化注册,然后将整个流上传到 Blob。 如果上传的文件大小只有几个 MB,请参阅有关如何执行这些步骤的 Azure Blob 指导,例如 块 Blob。
创建 URL 标记
上传输入文件后,生成要提供给通知中心的输入文件 URL 和输出目录 URL。 可为输入和输出使用两个不同的 Blob 容器。
static Uri GetOutputDirectoryUrl(BlobContainerClient container)
{
Console.WriteLine(container.CanGenerateSasUri);
BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.All, DateTime.UtcNow.AddDays(1));
return container.GenerateSasUri(builder);
}
static Uri GetInputFileUrl(BlobContainerClient container, string filePath)
{
Console.WriteLine(container.CanGenerateSasUri);
BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.Read, DateTime.UtcNow.AddDays(1));
return container.GenerateSasUri(builder);
}
提交作业
生成两个输入和输出 URL 后,接下来可以启动批处理作业。
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var job = await client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
job = await client.GetNotificationHubJobAsync(job.JobId);
await Task.Delay(1000);
i--;
}
此示例除了创建输入和输出 URL 以外,还会创建一个 NotificationHubJob
对象,该对象包含以下类型之一的 JobType
对象:
ImportCreateRegistrations
ImportUpdateRegistrations
ImportDeleteRegistrations
完成调用后,通知中心会继续运行该作业,可以通过调用 GetNotificationHubJobAsync 来检查作业状态。
完成该作业后,可以通过查看输出目录中的以下文件来检查结果:
/<hub>/<jobid>/Failed.txt
/<hub>/<jobid>/Output.txt
这些文件包含成功和失败的批处理操作列表。 文件的格式为 .cvs
,其中,每行包含原始输入文件的行号,以及操作的输出(通常是创建或更新的注册说明)。
完整示例代码
以下示例代码将注册导入到通知中心。
using Microsoft.Azure.NotificationHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Sas;
using System.Text;
namespace ConsoleApplication1
{
class Program
{
private static string CONNECTION_STRING = "namespace";
private static string HUB_NAME = "demohub";
private static string INPUT_FILE_NAME = "CreateFile.txt";
private static string STORAGE_ACCOUNT_CONNECTIONSTRING = "connectionstring";
private static string CONTAINER_NAME = "containername";
static async Task Main(string[] args)
{
var descriptions = new[]
{
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
};
// Get a reference to a container named "sample-container" and then create it
BlobContainerClient container = new BlobContainerClient(STORAGE_ACCOUNT_CONNECTIONSTRING, CONTAINER_NAME);
await container.CreateIfNotExistsAsync();
await SerializeToBlobAsync(container, descriptions);
// TODO then create Sas
var outputContainerSasUri = GetOutputDirectoryUrl(container);
BlobContainerClient inputcontainer = new BlobContainerClient(STORAGE_ACCOUNT_CONNECTIONSTRING, STORAGE_ACCOUNT_CONNECTIONSTRING + "/" + INPUT_FILE_NAME);
var inputFileSasUri = GetInputFileUrl(inputcontainer, INPUT_FILE_NAME);
// Import this file
NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var job = await client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
JobType = NotificationHubJobType.ImportCreateRegistrations,
OutputContainerUri = outputContainerSasUri,
ImportFileUri = inputFileSasUri
}
);
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
job = await client.GetNotificationHubJobAsync(job.JobId);
await Task.Delay(1000);
i--;
}
}
private static async Task SerializeToBlobAsync(BlobContainerClient container, RegistrationDescription[] descriptions)
{
StringBuilder builder = new StringBuilder();
foreach (var registrationDescription in descriptions)
{
builder.AppendLine(registrationDescription.Serialize());
}
var inputBlob = container.GetBlobClient(INPUT_FILE_NAME);
using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
{
await inputBlob.UploadAsync(stream);
}
}
static Uri GetOutputDirectoryUrl(BlobContainerClient container)
{
Console.WriteLine(container.CanGenerateSasUri);
BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.All, DateTime.UtcNow.AddDays(1));
return container.GenerateSasUri(builder);
}
static Uri GetInputFileUrl(BlobContainerClient container, string filePath)
{
Console.WriteLine(container.CanGenerateSasUri);
BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.Read, DateTime.UtcNow.AddDays(1));
return container.GenerateSasUri(builder);
}
}
}
Export
导出注册的过程类似于导入,但存在以下差别:
- 只需提供输出 URL。
- 创建 ExportRegistrations 类型的 NotificationHubJob。
示例代码片段
下面是在 Java 中导出注册的示例代码片段:
// Submit an export job
NotificationHubJob job = new NotificationHubJob();
job.setJobType(NotificationHubJobType.ExportRegistrations);
job.setOutputContainerUri("container uri with SAS signature");
job = hub.submitNotificationHubJob(job);
// Wait until the job is done
while(true){
Thread.sleep(1000);
job = hub.getNotificationHubJob(job.getJobId());
if(job.getJobStatus() == NotificationHubJobStatus.Completed)
break;
}
后续步骤
若要详细了解注册,请参阅以下文章: