Use Azure Queue storage to monitor Media Services job notifications with .NET

When you run encoding jobs, you often require a way to track job progress. You can configure Media Services to deliver notifications to Azure Queue storage. You can monitor job progress by getting notifications from the Queue storage.

Messages delivered to Queue storage can be accessed from anywhere in the world. The Queue storage messaging architecture is reliable and highly scalable.

One common scenario for listening to Media Services notifications is if you are developing a content management system that needs to perform some additional task after an encoding job completes (for example, to trigger the next step in a workflow, or to publish content).

This article shows how to get notification messages from Queue storage.

Considerations

Consider the following when developing Media Services applications that use Queue storage:

.NET code example

The code example in this section does the following:

  1. Defines the EncodingJobMessage class that maps to the notification message format. The code deserializes messages received from the queue into objects of the EncodingJobMessage type.

  2. Loads the Media Services and Storage account information from the app.config file. The code example uses this information to create the CloudMediaContext and CloudQueue objects.

  3. Creates the queue that receives notification messages about the encoding job.

  4. Creates the notification end point that is mapped to the queue.

  5. Attaches the notification end point to the job and submits the encoding job. You can have multiple notification end points attached to a job.

  6. Passes NotificationJobState.FinalStatesOnly to the AddNew method. (In this example, we are only interested in final states of the job processing.)

    job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly, _notificationEndPoint);
    
  7. If you pass NotificationJobState.All, you get all of the following state change notifications: queued, scheduled, processing, and finished. However, as noted earlier, Queue storage does not guarantee ordered delivery. To order messages, use the Timestamp property (defined on the EncodingJobMessage type in the example below). Duplicate messages are possible. To check for duplicates, use the ETag property (defined on the EncodingJobMessage type). It is also possible that some state change notifications get skipped.

  8. Waits for the job to get to the finished state by checking the queue every 10 seconds. Deletes messages after they have been processed.

  9. Deletes the queue and the notification end point.

Note

The best way to monitor a job’s state is by listening to notification messages, as shown in the following example:

Alternatively, you could check on a job’s state by using the IJob.State property. A notification message about a job’s completion may arrive before the state on IJob is set to Finished. The IJob.State property reflects the accurate state with a slight delay.

Create and configure a Visual Studio project

  1. Set up your development environment and populate the app.config file with connection information, as described in Media Services development with .NET.
  2. Create a new folder (folder can be anywhere on your local drive) and copy a .mp4 file that you want to encode and stream or progressively download. In this example, the "C:\Media" path is used.
  3. Add a reference to the System.Runtime.Serialization library.

Code

using System;
using System.Linq;
using System.Configuration;
using System.IO;
using System.Threading;
using System.Collections.Generic;
using Microsoft.WindowsAzure.MediaServices.Client;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using System.Runtime.Serialization.Json;

namespace JobNotification
{
    public class EncodingJobMessage
    {
        // MessageVersion is used for version control.
        public String MessageVersion { get; set; }

        // Type of the event. Valid values are
        // JobStateChange and NotificationEndpointRegistration.
        public String EventType { get; set; }

        // ETag is used to help the customer detect if
        // the message is a duplicate of another message previously sent.
        public String ETag { get; set; }

        // Time of occurrence of the event.
        public String TimeStamp { get; set; }

        // Collection of values specific to the event.

        // For the JobStateChange event the values are:
        //     JobId - Id of the Job that triggered the notification.
        //     NewState- The new state of the Job. Valid values are:
        //          Scheduled, Processing, Canceling, Cancelled, Error, Finished
        //     OldState- The old state of the Job. Valid values are:
        //          Scheduled, Processing, Canceling, Cancelled, Error, Finished

        // For the NotificationEndpointRegistration event the values are:
        //     NotificationEndpointId- Id of the NotificationEndpoint
        //          that triggered the notification.
        //     State- The state of the Endpoint.
        //          Valid values are: Registered and Unregistered.

        public IDictionary<string, object> Properties { get; set; }
    }

    class Program
    {

        // Read values from the App.config file.
        private static readonly string _AADTenantDomain =
            ConfigurationManager.AppSettings["AMSAADTenantDomain"];
        private static readonly string _RESTAPIEndpoint =
            ConfigurationManager.AppSettings["AMSRESTAPIEndpoint"];
        private static readonly string _AMSClientId =
            ConfigurationManager.AppSettings["AMSClientId"];
        private static readonly string _AMSClientSecret =
            ConfigurationManager.AppSettings["AMSClientSecret"];

        private static readonly string _StorageConnectionString =
            ConfigurationManager.AppSettings["StorageConnectionString"];

        private static CloudMediaContext _context = null;
        private static QueueClient _queue = null;
        private static INotificationEndPoint _notificationEndPoint = null;

        private static readonly string _singleInputMp4Path =
            Path.GetFullPath(@"C:\Media\BigBuckBunny.mp4");

        static void Main(string[] args)
        {
            string endPointAddress = Guid.NewGuid().ToString();
            string queueName = "queueName";

            // Create the context.
            AzureAdTokenCredentials tokenCredentials =
                new AzureAdTokenCredentials(_AADTenantDomain,
                    new AzureAdClientSymmetricKey(_AMSClientId, _AMSClientSecret),
                    AzureEnvironments.AzureCloudEnvironment);

            var tokenProvider = new AzureAdTokenProvider(tokenCredentials);

            _context = new CloudMediaContext(new Uri(_RESTAPIEndpoint), tokenProvider);

            // Create the queue that will be receiving the notification messages.
            _queue = CreateQueue(_StorageConnectionString, queueName);

            // Create the notification point that is mapped to the queue.
            _notificationEndPoint =
                    _context.NotificationEndPoints.Create(
                    Guid.NewGuid().ToString(), NotificationEndPointType.AzureQueue, endPointAddress);


            if (_notificationEndPoint != null)
            {
                IJob job = SubmitEncodingJobWithNotificationEndPoint(_singleInputMp4Path);
                WaitForJobToReachedFinishedState(job.Id);
            }

            // Clean up.
            _queue.Delete();
            _notificationEndPoint.Delete();
        }


        static public QueueClient CreateQueue(string storageAccountConnectionString, string queueName)
        {
            // Create the queue client
            QueueClient queue = new QueueClient(storageAccountConnectionString, queueName);

            // Create the queue if it doesn't already exist
            queue.CreateIfNotExistsAsync();

            return queue;
        }


        public static IJob SubmitEncodingJobWithNotificationEndPoint(string inputMediaFilePath)
        {
            // Declare a new job.
            IJob job = _context.Jobs.Create("My MP4 to Smooth Streaming encoding job");

            //Create an encrypted asset and upload the mp4.
            IAsset asset = CreateAssetAndUploadSingleFile(AssetCreationOptions.StorageEncrypted,
                inputMediaFilePath);

            // Get a media processor reference, and pass to it the name of the
            // processor to use for the specific task.
            IMediaProcessor processor = GetLatestMediaProcessorByName("Media Encoder Standard");

            // Create a task with the conversion details, using a configuration file.
            ITask task = job.Tasks.AddNew("My encoding Task",
                processor,
                "Adaptive Streaming",
                Microsoft.WindowsAzure.MediaServices.Client.TaskOptions.None);

            // Specify the input asset to be encoded.
            task.InputAssets.Add(asset);

            // Add an output asset to contain the results of the job.
            task.OutputAssets.AddNew("Output asset",
                AssetCreationOptions.None);

            // Add a notification point to the job. You can add multiple notification points.
            job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly,
                _notificationEndPoint);

            job.Submit();

            return job;
        }

        public static void WaitForJobToReachedFinishedState(string jobId)
        {
            int expectedState = (int)JobState.Finished;
            int timeOutInSeconds = 600;

            bool jobReachedExpectedState = false;
            DateTime startTime = DateTime.Now;
            int jobState = -1;

            while (!jobReachedExpectedState)
            {
                // Specify how often you want to get messages from the queue.
                Thread.Sleep(TimeSpan.FromSeconds(10));

                foreach (QueueMessage message in _queue.ReceiveMessages(maxMessages: 10).Value)
                {
                    DataContractJsonSerializerSettings settings = new DataContractJsonSerializerSettings();
                    settings.UseSimpleDictionaryFormat = true;
                    DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(EncodingJobMessage), settings);
                    EncodingJobMessage encodingJobMsg = (EncodingJobMessage)ser.ReadObject(message.Body.ToStream);

                    Console.WriteLine();

                    // Display the message information.
                    Console.WriteLine("EventType: {0}", encodingJobMsg.EventType);
                    Console.WriteLine("MessageVersion: {0}", encodingJobMsg.MessageVersion);
                    Console.WriteLine("ETag: {0}", encodingJobMsg.ETag);
                    Console.WriteLine("TimeStamp: {0}", encodingJobMsg.TimeStamp);
                    foreach (var property in encodingJobMsg.Properties)
                    {
                        Console.WriteLine("    {0}: {1}", property.Key, property.Value);
                    }

                    // We are only interested in messages
                    // where EventType is "JobStateChange".
                    if (encodingJobMsg.EventType == "JobStateChange")
                    {
                        string JobId = (String)encodingJobMsg.Properties.Where(j => j.Key == "JobId").FirstOrDefault().Value;
                        if (JobId == jobId)
                        {
                            string oldJobStateStr = (String)encodingJobMsg.Properties.
                                                            Where(j => j.Key == "OldState").FirstOrDefault().Value;
                            string newJobStateStr = (String)encodingJobMsg.Properties.
                                                            Where(j => j.Key == "NewState").FirstOrDefault().Value;

                            JobState oldJobState = (JobState)Enum.Parse(typeof(JobState), oldJobStateStr);
                            JobState newJobState = (JobState)Enum.Parse(typeof(JobState), newJobStateStr);

                            if (newJobState == (JobState)expectedState)
                            {
                                Console.WriteLine("job with Id: {0} reached expected state: {1}",
                                    jobId, newJobState);
                                jobReachedExpectedState = true;
                                break;
                            }
                        }
                    }
                    // Delete the message after we've read it.
                    _queue.DeleteMessage(message.MessageId, message.PopReceipt);
                }

                // Wait until timeout
                TimeSpan timeDiff = DateTime.Now - startTime;
                bool timedOut = (timeDiff.TotalSeconds > timeOutInSeconds);
                if (timedOut)
                {
                    Console.WriteLine(@"Timeout for checking job notification messages,
                                        latest found state ='{0}', wait time = {1} secs",
                        jobState,
                        timeDiff.TotalSeconds);

                    throw new TimeoutException();
                }
            }
        }

        static private IAsset CreateAssetAndUploadSingleFile(AssetCreationOptions assetCreationOptions, string singleFilePath)
        {
            var asset = _context.Assets.Create("UploadSingleFile_" + DateTime.UtcNow.ToString(),
                assetCreationOptions);

            var fileName = Path.GetFileName(singleFilePath);

            var assetFile = asset.AssetFiles.Create(fileName);

            Console.WriteLine("Created assetFile {0}", assetFile.Name);
            Console.WriteLine("Upload {0}", assetFile.Name);

            assetFile.Upload(singleFilePath);
            Console.WriteLine("Done uploading of {0}", assetFile.Name);

            return asset;
        }

        static private IMediaProcessor GetLatestMediaProcessorByName(string mediaProcessorName)
        {
            var processor = _context.MediaProcessors.Where(p => p.Name == mediaProcessorName).
                ToList().OrderBy(p => new Version(p.Version)).LastOrDefault();

            if (processor == null)
                throw new ArgumentException(string.Format("Unknown media processor", mediaProcessorName));

            return processor;
        }
    }
}

The preceding example produced the following output: Your values will vary.

Created assetFile BigBuckBunny.mp4
Upload BigBuckBunny.mp4
Done uploading of BigBuckBunny.mp4

EventType: NotificationEndPointRegistration
MessageVersion: 1.0
ETag: e0238957a9b25bdf3351a88e57978d6a81a84527fad03bc23861dbe28ab293f6
TimeStamp: 2013-05-14T20:22:37
    NotificationEndPointId: nb:nepid:UUID:d6af9412-2488-45b2-ba1f-6e0ade6dbc27
    State: Registered
    Name: dde957b2-006e-41f2-9869-a978870ac620
    Created: 2013-05-14T20:22:35

EventType: JobStateChange
MessageVersion: 1.0
ETag: 4e381f37c2d844bde06ace650310284d6928b1e50101d82d1b56220cfcb6076c
TimeStamp: 2013-05-14T20:24:40
    JobId: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54
    JobName: My MP4 to Smooth Streaming encoding job
    NewState: Finished
    OldState: Processing
    AccountName: westeuropewamsaccount
job with Id: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54 reached expected
State: Finished