使用 Azure 队列存储通过 .NET 监视媒体服务作业通知Use Azure Queue storage to monitor Media Services job notifications with .NET

Note

不会向媒体服务 v2 添加任何新特性或新功能。No new features or functionality are being added to Media Services v2.
查看最新版本:媒体服务 v3Check out the latest version, Media Services v3. 另请参阅从 v2 到 v3 的迁移指南Also, see migration guidance from v2 to v3

运行编码作业时,通常需要采用某种方式来跟踪作业进度。When you run encoding jobs, you often require a way to track job progress. 你可以配置媒体服务将通知传送到 Azure 队列存储You can configure Media Services to deliver notifications to Azure Queue storage. 然后可以通过从队列存储获取通知来监视作业进度。You can monitor job progress by getting notifications from the Queue storage.

用户可以从任何位置访问已传给到队列存储中的消息。Messages delivered to Queue storage can be accessed from anywhere in the world. 队列存储消息体系结构十分可靠,且伸缩性极高。The Queue storage messaging architecture is reliable and highly scalable. 建议使用其他方法轮询消息的队列存储。Polling Queue storage for messages is recommended over using other methods.

一种常见的媒体服务通知侦听方案是:你正在开发一个内容管理系统,完成编码作业后,该系统需要执行一些其他任务(例如触发工作流的下一步骤或者发布内容)。One common scenario for listening to Media Services notifications is if you are developing a content management system that needs to perform some additional task after an encoding job completes (for example, to trigger the next step in a workflow, or to publish content).

本文说明如何从队列存储获取通知消息。This article shows how to get notification messages from Queue storage.

注意事项Considerations

开发使用存储队列的媒体服务应用程序时,请注意以下几点:Consider the following when developing Media Services applications that use Queue storage:

.NET 代码示例.NET code example

本部分中的代码示例将执行以下操作:The code example in this section does the following:

  1. 定义映射为通知消息格式的 EncodingJobMessage 类。Defines the EncodingJobMessage class that maps to the notification message format. 代码将从队列接收到的消息反序列化为 EncodingJobMessage 类型的对象。The code deserializes messages received from the queue into objects of the EncodingJobMessage type.

  2. 从 app.config 文件中加载媒体服务和存储帐户信息。Loads the Media Services and Storage account information from the app.config file. 本代码示例使用此信息创建 CloudMediaContextCloudQueue 对象。The code example uses this information to create the CloudMediaContext and CloudQueue objects.

  3. 创建接收编码作业相关通知消息的队列。Creates the queue that receives notification messages about the encoding job.

  4. 创建一个映射到队列的通知终结点。Creates the notification end point that is mapped to the queue.

  5. 将通知终结点附加到作业,然后提交编码作业。Attaches the notification end point to the job and submits the encoding job. 可以将多个通知终结点附加到一个作业。You can have multiple notification end points attached to a job.

  6. NotificationJobState.FinalStatesOnly 传递到 AddNew 方法。Passes NotificationJobState.FinalStatesOnly to the AddNew method. (本例中,只想了解作业处理的最终状态。)(In this example, we are only interested in final states of the job processing.)

     job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly, _notificationEndPoint);
    
  7. 如果传递 NotificationJobState.All,则会收到以下所有状态更改通知:已排队、已计划、处理中和已完成。If you pass NotificationJobState.All, you get all of the following state change notifications: queued, scheduled, processing, and finished. 但如前所述,队列存储不保证按顺序传递。However, as noted earlier, Queue storage does not guarantee ordered delivery. 若要对消息排序,请使用 Timestamp 属性(在以下示例的 EncodingJobMessage 类型中定义)。To order messages, use the Timestamp property (defined on the EncodingJobMessage type in the example below). 可能出现重复消息。Duplicate messages are possible. 若要检查重复项,请使用 ETag 属性(在 EncodingJobMessage 类型中定义)。To check for duplicates, use the ETag property (defined on the EncodingJobMessage type). 此外,可能会跳过某些状态更改通知。It is also possible that some state change notifications get skipped.

  8. 每 10 秒检查一次队列,等待作业进入已完成状态。Waits for the job to get to the finished state by checking the queue every 10 seconds. 处理消息后删除消息。Deletes messages after they have been processed.

  9. 删除队列和通知终结点。Deletes the queue and the notification end point.

Note

建议通过侦听通知消息来监视作业的状态,如下例所示:The recommended way to monitor a job’s state is by listening to notification messages, as shown in the following example:

或者,可以使用 IJob.State 属性检查作业状态。Alternatively, you could check on a job’s state by using the IJob.State property. IJob 的状态设置为“已完成” 之前,可能会先收到一条指示作业已完成的通知消息。A notification message about a job’s completion may arrive before the state on IJob is set to Finished. IJob.State 属性在延迟片刻之后反映正确的状态。The IJob.State property reflects the accurate state with a slight delay.

创建和配置 Visual Studio 项目Create and configure a Visual Studio project

  1. 设置开发环境,并根据使用 .NET 进行媒体服务开发中所述,在 app.config 文件中填充连接信息。Set up your development environment and populate the app.config file with connection information, as described in Media Services development with .NET.
  2. 创建新的文件夹(文件夹可以位于本地驱动器上的任何位置),复制需要编码和流处理或渐进式下载的 .mp4 文件。Create a new folder (folder can be anywhere on your local drive) and copy a .mp4 file that you want to encode and stream or progressively download. 在此示例中,我们使用了“C:\Media”路径。In this example, the "C:\Media" path is used.
  3. 将引用添加到“System.Runtime.Serialization”库中 。Add a reference to the System.Runtime.Serialization library.

代码Code

using System;
using System.Linq;
using System.Configuration;
using System.IO;
using System.Threading;
using System.Collections.Generic;
using Microsoft.WindowsAzure.MediaServices.Client;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System.Runtime.Serialization.Json;

namespace JobNotification
{
    public class EncodingJobMessage
    {
        // MessageVersion is used for version control.
        public String MessageVersion { get; set; }

        // Type of the event. Valid values are
        // JobStateChange and NotificationEndpointRegistration.
        public String EventType { get; set; }

        // ETag is used to help the customer detect if
        // the message is a duplicate of another message previously sent.
        public String ETag { get; set; }

        // Time of occurrence of the event.
        public String TimeStamp { get; set; }

        // Collection of values specific to the event.

        // For the JobStateChange event the values are:
        //     JobId - Id of the Job that triggered the notification.
        //     NewState- The new state of the Job. Valid values are:
        //          Scheduled, Processing, Canceling, Cancelled, Error, Finished
        //     OldState- The old state of the Job. Valid values are:
        //          Scheduled, Processing, Canceling, Cancelled, Error, Finished

        // For the NotificationEndpointRegistration event the values are:
        //     NotificationEndpointId- Id of the NotificationEndpoint
        //          that triggered the notification.
        //     State- The state of the Endpoint.
        //          Valid values are: Registered and Unregistered.

        public IDictionary<string, object> Properties { get; set; }
    }

    class Program
    {

        // Read values from the App.config file.
        private static readonly string _AADTenantDomain =
            ConfigurationManager.AppSettings["AMSAADTenantDomain"];
        private static readonly string _RESTAPIEndpoint =
            ConfigurationManager.AppSettings["AMSRESTAPIEndpoint"];
        private static readonly string _AMSClientId =
            ConfigurationManager.AppSettings["AMSClientId"];
        private static readonly string _AMSClientSecret =
            ConfigurationManager.AppSettings["AMSClientSecret"];

        private static readonly string _StorageConnectionString = 
            ConfigurationManager.AppSettings["StorageConnectionString"];

        private static CloudMediaContext _context = null;
        private static CloudQueue _queue = null;
        private static INotificationEndPoint _notificationEndPoint = null;

        private static readonly string _singleInputMp4Path =
            Path.GetFullPath(@"C:\Media\BigBuckBunny.mp4");

        static void Main(string[] args)
        {
            string endPointAddress = Guid.NewGuid().ToString();

            // Create the context.
            AzureAdTokenCredentials tokenCredentials = 
                new AzureAdTokenCredentials(_AADTenantDomain,
                    new AzureAdClientSymmetricKey(_AMSClientId, _AMSClientSecret),
                    AzureEnvironments.AzureChinaCloudEnvironment);

            var tokenProvider = new AzureAdTokenProvider(tokenCredentials);

            _context = new CloudMediaContext(new Uri(_RESTAPIEndpoint), tokenProvider);
            
            // Create the queue that will be receiving the notification messages.
            _queue = CreateQueue(_StorageConnectionString, endPointAddress);

            // Create the notification point that is mapped to the queue.
            _notificationEndPoint =
                    _context.NotificationEndPoints.Create(
                    Guid.NewGuid().ToString(), NotificationEndPointType.AzureQueue, endPointAddress);


            if (_notificationEndPoint != null)
            {
                IJob job = SubmitEncodingJobWithNotificationEndPoint(_singleInputMp4Path);
                WaitForJobToReachedFinishedState(job.Id);
            }

            // Clean up.
            _queue.Delete();
            _notificationEndPoint.Delete();
        }


        static public CloudQueue CreateQueue(string storageAccountConnectionString, string endPointAddress)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountConnectionString);

            // Create the queue client
            CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();

            // Retrieve a reference to a queue
            CloudQueue queue = queueClient.GetQueueReference(endPointAddress);

            // Create the queue if it doesn't already exist
            queue.CreateIfNotExists();

            return queue;
        }


        public static IJob SubmitEncodingJobWithNotificationEndPoint(string inputMediaFilePath)
        {
            // Declare a new job.
            IJob job = _context.Jobs.Create("My MP4 to Smooth Streaming encoding job");

            //Create an encrypted asset and upload the mp4.
            IAsset asset = CreateAssetAndUploadSingleFile(AssetCreationOptions.StorageEncrypted,
                inputMediaFilePath);

            // Get a media processor reference, and pass to it the name of the
            // processor to use for the specific task.
            IMediaProcessor processor = GetLatestMediaProcessorByName("Media Encoder Standard");

            // Create a task with the conversion details, using a configuration file.
            ITask task = job.Tasks.AddNew("My encoding Task",
                processor,
                "Adaptive Streaming",
                Microsoft.WindowsAzure.MediaServices.Client.TaskOptions.None);

            // Specify the input asset to be encoded.
            task.InputAssets.Add(asset);

            // Add an output asset to contain the results of the job.
            task.OutputAssets.AddNew("Output asset",
                AssetCreationOptions.None);

            // Add a notification point to the job. You can add multiple notification points.  
            job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly,
                _notificationEndPoint);

            job.Submit();

            return job;
        }

        public static void WaitForJobToReachedFinishedState(string jobId)
        {
            int expectedState = (int)JobState.Finished;
            int timeOutInSeconds = 600;

            bool jobReachedExpectedState = false;
            DateTime startTime = DateTime.Now;
            int jobState = -1;

            while (!jobReachedExpectedState)
            {
                // Specify how often you want to get messages from the queue.
                Thread.Sleep(TimeSpan.FromSeconds(10));

                foreach (var message in _queue.GetMessages(10))
                {
                    using (Stream stream = new MemoryStream(message.AsBytes))
                    {
                        DataContractJsonSerializerSettings settings = new DataContractJsonSerializerSettings();
                        settings.UseSimpleDictionaryFormat = true;
                        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(EncodingJobMessage), settings);
                        EncodingJobMessage encodingJobMsg = (EncodingJobMessage)ser.ReadObject(stream);

                        Console.WriteLine();

                        // Display the message information.
                        Console.WriteLine("EventType: {0}", encodingJobMsg.EventType);
                        Console.WriteLine("MessageVersion: {0}", encodingJobMsg.MessageVersion);
                        Console.WriteLine("ETag: {0}", encodingJobMsg.ETag);
                        Console.WriteLine("TimeStamp: {0}", encodingJobMsg.TimeStamp);
                        foreach (var property in encodingJobMsg.Properties)
                        {
                            Console.WriteLine("    {0}: {1}", property.Key, property.Value);
                        }

                        // We are only interested in messages
                        // where EventType is "JobStateChange".
                        if (encodingJobMsg.EventType == "JobStateChange")
                        {
                            string JobId = (String)encodingJobMsg.Properties.Where(j => j.Key == "JobId").FirstOrDefault().Value;
                            if (JobId == jobId)
                            {
                                string oldJobStateStr = (String)encodingJobMsg.Properties.
                                                            Where(j => j.Key == "OldState").FirstOrDefault().Value;
                                string newJobStateStr = (String)encodingJobMsg.Properties.
                                                            Where(j => j.Key == "NewState").FirstOrDefault().Value;

                                JobState oldJobState = (JobState)Enum.Parse(typeof(JobState), oldJobStateStr);
                                JobState newJobState = (JobState)Enum.Parse(typeof(JobState), newJobStateStr);

                                if (newJobState == (JobState)expectedState)
                                {
                                    Console.WriteLine("job with Id: {0} reached expected state: {1}",
                                        jobId, newJobState);
                                    jobReachedExpectedState = true;
                                    break;
                                }
                            }
                        }
                    }
                    // Delete the message after we've read it.
                    _queue.DeleteMessage(message);
                }

                // Wait until timeout
                TimeSpan timeDiff = DateTime.Now - startTime;
                bool timedOut = (timeDiff.TotalSeconds > timeOutInSeconds);
                if (timedOut)
                {
                    Console.WriteLine(@"Timeout for checking job notification messages,
                                        latest found state ='{0}', wait time = {1} secs",
                        jobState,
                        timeDiff.TotalSeconds);

                    throw new TimeoutException();
                }
            }
        }

        static private IAsset CreateAssetAndUploadSingleFile(AssetCreationOptions assetCreationOptions, string singleFilePath)
        {
            var asset = _context.Assets.Create("UploadSingleFile_" + DateTime.UtcNow.ToString(),
                assetCreationOptions);

            var fileName = Path.GetFileName(singleFilePath);

            var assetFile = asset.AssetFiles.Create(fileName);

            Console.WriteLine("Created assetFile {0}", assetFile.Name);
            Console.WriteLine("Upload {0}", assetFile.Name);

            assetFile.Upload(singleFilePath);
            Console.WriteLine("Done uploading of {0}", assetFile.Name);

            return asset;
        }

        static private IMediaProcessor GetLatestMediaProcessorByName(string mediaProcessorName)
        {
            var processor = _context.MediaProcessors.Where(p => p.Name == mediaProcessorName).
                ToList().OrderBy(p => new Version(p.Version)).LastOrDefault();

            if (processor == null)
                throw new ArgumentException(string.Format("Unknown media processor", mediaProcessorName));

            return processor;
        }
    }
}

上例生成了以下输出,值会有所变化。The preceding example produced the following output: Your values will vary.

Created assetFile BigBuckBunny.mp4
Upload BigBuckBunny.mp4
Done uploading of BigBuckBunny.mp4

EventType: NotificationEndPointRegistration
MessageVersion: 1.0
ETag: e0238957a9b25bdf3351a88e57978d6a81a84527fad03bc23861dbe28ab293f6
TimeStamp: 2013-05-14T20:22:37
    NotificationEndPointId: nb:nepid:UUID:d6af9412-2488-45b2-ba1f-6e0ade6dbc27
    State: Registered
    Name: dde957b2-006e-41f2-9869-a978870ac620
    Created: 2013-05-14T20:22:35

EventType: JobStateChange
MessageVersion: 1.0
ETag: 4e381f37c2d844bde06ace650310284d6928b1e50101d82d1b56220cfcb6076c
TimeStamp: 2013-05-14T20:24:40
    JobId: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54
    JobName: My MP4 to Smooth Streaming encoding job
    NewState: Finished
    OldState: Processing
    AccountName: westeuropewamsaccount
job with Id: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54 reached expected
State: Finished

后续步骤Next step

查看媒体服务学习路径。Review Media Services learning paths.

媒体服务 v3(最新版本)Media Services v3 (latest)

查看最新版本的 Azure 媒体服务!Check out the latest version of Azure Media Services!

媒体服务 v2(旧版)Media Services v2 (legacy)