.Net 接收 IotHub 消息的两种方式

直接接收

需要指定分区执行循环接收。ReceiveMessagesFromDeviceAsync 方法中的 eventHubClient.GetDefaultConsumerGroup().CreateReceiver() 第一个参数为分区,第二个参数为 Offset,此处为当前日期,只能接收当前启动以后的消息,也可以设置为 -1,接收全部的消息。

GitHub 示例地址:azure-iot-samples-csharp

        static string connectionString = "IotHub 的共享访问策略中的 iothubowner 的连接字符串";
        static string iotHubD2cEndpoint = "messages/events";
        static EventHubClient eventHubClient;
        static void Main(string[] args)
        {
            Console.WriteLine("Receive message. Ctrl-C to exit.\n");
            eventHubClient = EventHubClient.CreateFromConnectionString(connectionString,iotHubD2cEndpoint);
            var d2cPartitions = eventHubClient.GetRuntimeInformation().PartitionIds;
            CancellationTokenSource cts = new CancellationTokenSource();

            System.Console.CancelKeyPress += (s, e) =>
            {
                e.Cancel = true;
                cts.Cancel();
                Console.WriteLine("Exiting...");
            };
            var tasks = new List<Task>();
            foreach (string partition in d2cPartitions)
            {
                tasks.Add(ReceiveMessagesFromDeviceAsync(partition,cts.Token));
            }
            Task.WaitAll(tasks.ToArray());
        }
        /// <summary>
        /// 接收设备到 iothub 的消息(仅接收当前时间启动后发送的消息)
        /// </summary>
        /// <param name="partition">分区</param>
        /// <param name="ct"></param>
        /// <returns></returns>
        private static async Task ReceiveMessagesFromDeviceAsync(string partition, CancellationToken ct)
        {
            var eventHubReceiver = eventHubClient.GetDefaultConsumerGroup().CreateReceiver(partition,DateTime.UtcNow);
            //var eventHubReceiver = eventHubClient.GetDefaultConsumerGroup().CreateReceiver(partition,-1");
            while (true)
            {
                if (ct.IsCancellationRequested) break;
                EventData eventData = await eventHubReceiver.ReceiveAsync();
                if (eventData == null) continue;
                string data = Encoding.UTF8.GetString(eventData.GetBytes());
                Console.WriteLine("Message received. Partition: {0} Data: '{1}'", partition,data);
            }
        }

通过 EPH 方式接收消息

客户端需要从存储中的 container 读取 offset 位置信息(-1 全部读取),采用租约的方式控制并发,防止其他的客户端进入到这个 container 中读取信息。

Github 示例地址:azure-event-hubs

private const string EventHubConnectionString = "Event Hubs connection string";
        private const string EventHubName = "event hub name";
        private const string StorageContainerName = "Storage account container name";
        private const string StorageAccountName = "Storage account name";
        private const string StorageAccountKey = "Storage account key";

        private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);

        public static void Main(string[] args)
        {
            MainAsync(args).GetAwaiter().GetResult();
        }

        private static async Task MainAsync(string[] args)
        {
            Console.WriteLine("Registering EventProcessor...");

            var eventProcessorHost = new EventProcessorHost(
                EventHubName,
                PartitionReceiver.DefaultConsumerGroupName,
                EventHubConnectionString,
                StorageConnectionString,
                StorageContainerName);

            // Registers the Event Processor Host and starts receiving messages
            await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

            Console.WriteLine("Receiving. Press enter key to stop worker.");
            Console.ReadLine();

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }