使用 Azure IoT 中心来计划和跟踪更新数百万台设备的作业。 使用作业可以:
- 更新所需属性
- 更新标记
- 调用直接方法
作业封装这些操作中的一个,并跟踪其在一组设备上的执行进度。 设备孪生查询定义作业所针对的设备集。 例如,后端应用可以使用作业在 10,000 台设备上调用用来重新启动设备的直接方法。 使用设备孪生查询语句指定设备集,并计划任务在未来的时间运行。 作业跟踪每个设备接收和执行重新启动直接方法的进度。
若要详细了解每项功能,请参阅:
设备孪生和属性:设备孪生入门 和 了解和使用 IoT 中枢中的设备孪生
注释
本文中所述的功能仅在 IoT 中心的标准层中可用。 有关基本层和标准/免费 IoT 中心层的详细信息,请参阅 为解决方案选择正确的 IoT 中心层和大小。
本文介绍如何创建两个 Java 应用:
一个设备应用 ,模拟设备,它实现一个名为 lockDoor 的直接方法,该方法可由后端应用调用。
一个后端应用, 计划作业,用于创建两个作业。 一个作业调用 lockDoor 直接方法,另一个作业将所需的属性更新发送到多个设备。
注释
有关可用于生成设备和后端应用的 SDK 工具的详细信息,请参阅 Azure IoT SDK 。
先决条件
已注册的设备。 在 Azure 门户中注册一个。
Java SE 开发工具包 8。 请确保在长期支持下选择 Java 8,以获取 JDK 8 的下载。
确保已在防火墙中打开端口 8883。 本文中的设备示例使用通过端口 8883 进行通信的 MQTT 协议。 某些企业和教育网络环境中可能会阻止此端口。 有关解决此问题的详细信息和方法,请参阅“连接到 IoT 中心”(MQTT)。
注释
为简单起见,本文不实现重试策略。 在生产代码中,应实施重试策略(如指数退避),如暂时 性故障处理一文中所述。
获取 IoT 中心连接字符串
在本文中,你将创建一个后端服务,该服务计划作业以在设备上调用直接方法,计划作业以更新设备孪生,并监视每个作业的进度。 若要执行这些作,服务需要 注册表读取 和 注册表写入 权限。 默认情况下,每个 IoT 中心都是使用名为 registryReadWrite 的共享访问策略创建的,该策略授予这些权限。
若要获取 registryReadWrite 策略的 IoT 中心连接字符串,请执行以下步骤:
在 Azure 门户中,选择 资源组。 选择中心所在的资源组,然后从资源列表中选择中心。
在中心的左侧窗格中,选择 “共享访问策略”。
从策略列表中,选择 registryReadWrite 策略。
复制“主连接字符串”并保存该值。
有关 IoT 中心共享访问策略和权限的详细信息,请参阅 访问控制和权限。
创建服务应用
在本部分中,你将创建一个 Java 控制台应用,该应用使用作业来:
在多个设备上调用 lockDoor 直接方法。
将所需属性发送到多个设备。
创建应用:
在开发计算机上,创建名为 iot-java-schedule-jobs 的空文件夹。
在 iot-java-schedule-jobs 文件夹中,在命令提示符处使用以下命令创建名为 schedule-jobs 的 Maven 项目。 请注意,这是一个单长命令:
mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=schedule-jobs -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false在命令提示符下,导航到 schedule-jobs 文件夹。
使用文本编辑器打开 schedule-jobs 文件夹中的pom.xml 文件,并将以下依赖项添加到依赖项节点。 通过此依赖项,可以使用应用中的 iot-service-client 包与 IoT 中心通信:
<dependency> <groupId>com.microsoft.azure.sdk.iot</groupId> <artifactId>iot-service-client</artifactId> <version>1.17.1</version> <type>jar</type> </dependency>注释
可以使用 Maven 搜索检查最新版本的 iot-service-client。
在依赖项节点之后添加以下生成节点。 此配置指示 Maven 使用 Java 1.8 生成应用:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>保存并关闭 pom.xml 文件。
使用文本编辑器打开 schedule-jobs\src\main\java\com\mycompany\app\App.java 文件。
将以下 import 语句添加到该文件:
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinDevice; import com.microsoft.azure.sdk.iot.service.devicetwin.Pair; import com.microsoft.azure.sdk.iot.service.devicetwin.Query; import com.microsoft.azure.sdk.iot.service.devicetwin.SqlQuery; import com.microsoft.azure.sdk.iot.service.jobs.JobClient; import com.microsoft.azure.sdk.iot.service.jobs.JobResult; import com.microsoft.azure.sdk.iot.service.jobs.JobStatus; import java.util.Date; import java.time.Instant; import java.util.HashSet; import java.util.Set; import java.util.UUID;将以下类级变量添加到 App 类。 将
{youriothubconnectionstring}替换为之前在获取 IoT Hub 连接字符串中复制的 IoT Hub 连接字符串:public static final String iotHubConnectionString = "{youriothubconnectionstring}"; public static final String deviceId = "myDeviceId"; // How long the job is permitted to run without // completing its work on the set of devices private static final long maxExecutionTimeInSeconds = 30;将以下方法添加到 App 类,以计划作业以更新设备孪生中的 Building 和 Floor 所需属性:
private static JobResult scheduleJobSetDesiredProperties(JobClient jobClient, String jobId) { DeviceTwinDevice twin = new DeviceTwinDevice(deviceId); Set<Pair> desiredProperties = new HashSet<Pair>(); desiredProperties.add(new Pair("Building", 43)); desiredProperties.add(new Pair("Floor", 3)); twin.setDesiredProperties(desiredProperties); // Optimistic concurrency control twin.setETag("*"); // Schedule the update twin job to run now // against a single device System.out.println("Schedule job " + jobId + " for device " + deviceId); try { JobResult jobResult = jobClient.scheduleUpdateTwin(jobId, "deviceId='" + deviceId + "'", twin, new Date(), maxExecutionTimeInSeconds); return jobResult; } catch (Exception e) { System.out.println("Exception scheduling desired properties job: " + jobId); System.out.println(e.getMessage()); return null; } }若要计划作业以调用 lockDoor 方法,请将以下方法添加到 App 类:
private static JobResult scheduleJobCallDirectMethod(JobClient jobClient, String jobId) { // Schedule a job now to call the lockDoor direct method // against a single device. Response and connection // timeouts are set to 5 seconds. System.out.println("Schedule job " + jobId + " for device " + deviceId); try { JobResult jobResult = jobClient.scheduleDeviceMethod(jobId, "deviceId='" + deviceId + "'", "lockDoor", 5L, 5L, null, new Date(), maxExecutionTimeInSeconds); return jobResult; } catch (Exception e) { System.out.println("Exception scheduling direct method job: " + jobId); System.out.println(e.getMessage()); return null; } };若要监视作业,请将以下方法添加到 App 类:
private static void monitorJob(JobClient jobClient, String jobId) { try { JobResult jobResult = jobClient.getJob(jobId); if(jobResult == null) { System.out.println("No JobResult for: " + jobId); return; } // Check the job result until it's completed while(jobResult.getJobStatus() != JobStatus.completed) { Thread.sleep(100); jobResult = jobClient.getJob(jobId); System.out.println("Status " + jobResult.getJobStatus() + " for job " + jobId); } System.out.println("Final status " + jobResult.getJobStatus() + " for job " + jobId); } catch (Exception e) { System.out.println("Exception monitoring job: " + jobId); System.out.println(e.getMessage()); return; } }若要查询运行作业的详细信息,请添加以下方法:
private static void queryDeviceJobs(JobClient jobClient, String start) throws Exception { System.out.println("\nQuery device jobs since " + start); // Create a jobs query using the time the jobs started Query deviceJobQuery = jobClient .queryDeviceJob(SqlQuery.createSqlQuery("*", SqlQuery.FromType.JOBS, "devices.jobs.startTimeUtc > '" + start + "'", null).getQuery()); // Iterate over the list of jobs and print the details while (jobClient.hasNextJob(deviceJobQuery)) { System.out.println(jobClient.getNextJob(deviceJobQuery)); } }更新 main 方法签名以包括以下
throws子句:public static void main( String[] args ) throws Exception若要按顺序运行和监视两个作业,请将 main 方法中的代码替换为以下代码:
// Record the start time String start = Instant.now().toString(); // Create JobClient JobClient jobClient = JobClient.createFromConnectionString(iotHubConnectionString); System.out.println("JobClient created with success"); // Schedule twin job desired properties // Maximum concurrent jobs is 1 for Free and S1 tiers String desiredPropertiesJobId = "DPCMD" + UUID.randomUUID(); scheduleJobSetDesiredProperties(jobClient, desiredPropertiesJobId); monitorJob(jobClient, desiredPropertiesJobId); // Schedule twin job direct method String directMethodJobId = "DMCMD" + UUID.randomUUID(); scheduleJobCallDirectMethod(jobClient, directMethodJobId); monitorJob(jobClient, directMethodJobId); // Run a query to show the job detail queryDeviceJobs(jobClient, start); System.out.println("Shutting down schedule-jobs app");保存并关闭 schedule-jobs\src\main\java\com\mycompany\app\App.java 文件
生成 计划作业 应用并更正任何错误。 在命令提示符下,导航到 计划作业 文件夹并运行以下命令:
mvn clean package -DskipTests
创建设备应用
在本部分中,你将创建一个 Java 控制台应用,用于处理从 IoT 中心发送的所需属性并实现直接方法调用。
在 iot-java-schedule-jobs 文件夹中,在命令提示符处使用以下命令创建名为 simulated-device 的 Maven 项目。 请注意,这是一个单长命令:
mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=simulated-device -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false在命令提示符下,导航到 simulated-device 文件夹。
使用文本编辑器打开 simulated-device 文件夹中的pom.xml 文件,并将以下依赖项添加到依赖项节点。 通过此依赖项,可以使用应用中的 iot-device-client 包与 IoT 中心通信:
<dependency> <groupId>com.microsoft.azure.sdk.iot</groupId> <artifactId>iot-device-client</artifactId> <version>1.17.5</version> </dependency>注释
可以使用 Maven 搜索检查最新版本的 iot-device-client。
将以下依赖项添加到 依赖项 节点。 此依赖项将 Apache SLF4J 日志记录外观配置为 NOP,设备客户端 SDK 使用它来进行日志记录。 此配置是可选的,但如果省略该配置,则运行应用时可能会在控制台中看到警告。 有关设备客户端 SDK 中日志记录的详细信息,请参阅适用于 Java 自述文件的 Azure IoT 设备 SDK 示例中的日志记录。
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.28</version> </dependency>在依赖项节点之后添加以下构建节点。 此配置指示 Maven 使用 Java 1.8 生成应用:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>保存并关闭 pom.xml 文件。
使用文本编辑器打开 simulated-device\src\main\java\com\mycompany\app\App.java 文件。
将以下 import 语句添加到该文件:
import com.microsoft.azure.sdk.iot.device.*; import com.microsoft.azure.sdk.iot.device.DeviceTwin.*; import java.io.IOException; import java.net.URISyntaxException; import java.util.Scanner;将以下类级变量添加到 App 类。 替换为
{yourdeviceconnectionstring}在 IoT 中心注册设备时看到的设备连接字符串:private static String connString = "{yourdeviceconnectionstring}"; private static IotHubClientProtocol protocol = IotHubClientProtocol.MQTT; private static final int METHOD_SUCCESS = 200; private static final int METHOD_NOT_DEFINED = 404;此示例应用在实例化 DeviceClient 对象时使用协议变量。
若要将设备孪生通知打印到控制台,请将以下嵌套类添加到 App 类:
// Handler for device twin operation notifications from IoT Hub protected static class DeviceTwinStatusCallBack implements IotHubEventCallback { public void execute(IotHubStatusCode status, Object context) { System.out.println("IoT Hub responded to device twin operation with status " + status.name()); } }若要将直接方法通知打印到控制台,请将以下嵌套类添加到 App 类:
// Handler for direct method notifications from IoT Hub protected static class DirectMethodStatusCallback implements IotHubEventCallback { public void execute(IotHubStatusCode status, Object context) { System.out.println("IoT Hub responded to direct method operation with status " + status.name()); } }若要处理来自 IoT 中心的直接方法调用,请将以下嵌套类添加到 App 类:
// Handler for direct method calls from IoT Hub protected static class DirectMethodCallback implements DeviceMethodCallback { @Override public DeviceMethodData call(String methodName, Object methodData, Object context) { DeviceMethodData deviceMethodData; switch (methodName) { case "lockDoor": { System.out.println("Executing direct method: " + methodName); deviceMethodData = new DeviceMethodData(METHOD_SUCCESS, "Executed direct method " + methodName); break; } default: { deviceMethodData = new DeviceMethodData(METHOD_NOT_DEFINED, "Not defined direct method " + methodName); } } // Notify IoT Hub of result return deviceMethodData; } }更新 main 方法签名以包括以下
throws子句:public static void main( String[] args ) throws IOException, URISyntaxException将 main 方法中的代码替换为以下代码:
- 创建设备客户端以与 IoT 中心通信。
- 创建 Device 对象以存储设备孪生属性。
// Create a device client DeviceClient client = new DeviceClient(connString, protocol); // An object to manage device twin desired and reported properties Device dataCollector = new Device() { @Override public void PropertyCall(String propertyKey, Object propertyValue, Object context) { System.out.println("Received desired property change: " + propertyKey + " " + propertyValue); } };若要启动设备客户端服务,请将以下代码添加到 main 方法:
try { // Open the DeviceClient // Start the device twin services // Subscribe to direct method calls client.open(); client.startDeviceTwin(new DeviceTwinStatusCallBack(), null, dataCollector, null); client.subscribeToDeviceMethod(new DirectMethodCallback(), null, new DirectMethodStatusCallback(), null); } catch (Exception e) { System.out.println("Exception, shutting down \n" + " Cause: " + e.getCause() + " \n" + e.getMessage()); dataCollector.clean(); client.closeNow(); System.out.println("Shutting down..."); }若要等待用户在关闭前按 Enter 键,请将以下代码添加到 main 方法的末尾:
// Close the app System.out.println("Press any key to exit..."); Scanner scanner = new Scanner(System.in); scanner.nextLine(); dataCollector.clean(); client.closeNow(); scanner.close();保存并关闭 simulated-device\src\main\java\com\mycompany\app\App.java 文件。
生成 模拟设备 应用并更正任何错误。 在命令提示符下,导航到 simulated-device 文件夹并运行以下命令:
mvn clean package -DskipTests
运行应用
现在可以运行控制台应用了。
在 simulated-device 文件夹中的命令提示符下,运行以下命令以启动侦听所需属性更改和直接方法调用的设备应用:
mvn exec:java -Dexec.mainClass="com.mycompany.app.App"
在文件夹中的
schedule-jobs命令提示符下,运行以下命令以运行 计划作业 服务应用以运行两个作业。 第一个设置所需的属性值,第二个调用直接方法:mvn exec:java -Dexec.mainClass="com.mycompany.app.App"
设备应用处理所需的属性更改和直接方法调用: