计划和广播作业 (Java)
使用 Azure IoT 中心来计划和跟踪可更新数百万台设备的作业。 使用作业可以:
- 更新所需属性
- 更新标记
- 调用直接方法
作业包装上述一项操作,并跟踪一组设备中的执行情况。 设备孪生查询定义作业执行的一组设备。 例如,后端应用可以使用作业在 10,000 台设备上调用直接方法来重启设备。 使用设备孪生查询指定设备集,并将作业计划为在以后运行。 每个设备接收和执行“重新启动”直接方法时,该作业都会跟踪进度。
若要详细了解其中的每项功能,请参阅:
设备孪生和属性:设备孪生入门和在 IoT 中心了解并使用设备孪生
注意
本文所述的功能只能用于 IoT 中心的标准层。 有关 IoT 中心基本层和标准/免费层的详细信息,请参阅选择适合你的解决方案的 IoT 中心层。
本文介绍如何创建两个 Java 应用:
一个设备应用(模拟设备),它实现了一个称为“lockDoor”的直接方法,后端应用可以调用该方法。
一个后端应用(计划作业),它创建了两个作业。 一个作业调用 lockDoor 直接方法,另一个作业将所需的属性更新发送到多个设备。
注意
有关可用于生成设备和后端应用的 SDK 工具的详细信息,请参阅 Azure IoT SDK。
先决条件
已注册的设备。 在 Azure 门户中注册一个。
Java SE 开发工具包 8。 请确保在“长期支持”下选择“Java 8”以获取 JDK 8 的下载。
确保已在防火墙中打开端口 8883。 本文中的设备示例使用 MQTT 协议,该协议通过端口 8883 进行通信。 在某些公司和教育网络环境中,此端口可能被阻止。 有关解决此问题的更多信息和方法,请参阅连接到 IoT 中心(MQTT)。
注意
为简单起见,本文不实现重试策略。 在生产代码中,应该按文章 Transient Fault Handling(暂时性故障处理)中所述实施重试策略(例如指数退避)。
获取 IoT 中心连接字符串
在本文中,你将创建一项后端服务,该服务计划用于在设备上调用直接方法的作业及计划用于更新设备孪生的作业,并监视每个作业的进度。 若要执行这些操作,服务需要“注册表读取”和“注册表写入”权限。 默认情况下,每个 IoT 中心都使用名为 registryReadWrite的共享访问策略创建,该策略会授予这些权限。
若要获取 registryReadWrite策略的 IoT 中心连接字符串,请执行以下步骤:
在 Azure 门户中,选择“资源组”。 选择中心所在的资源组,然后从资源列表中选择中心。
在中心的左侧窗格上,选择“共享访问策略”。
在策略列表中,选择“registryReadWrite”策略。
复制“主连接字符串”并保存该值。
有关 IoT 中心共享访问策略和权限的详细信息,请参阅访问控制和权限。
创建服务应用
本部分中将创建一个使用作业进行如下操作的 Java 控制台应用:
在多台设备上调用 lockDoor 直接方法。
向多台设备发送必需属性。
创建应用:
在开发计算机上,创建名为 iot-java-schedule-jobs 的空文件夹。
在 iot-java-schedule-jobs 文件夹中,通过命令提示符使用以下命令创建名为 schedule-jobs 的 Maven 项目。 请注意,这是一条很长的命令:
mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=schedule-jobs -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
在命令提示符下,导航到新的 schedule-jobs 文件夹。
使用文本编辑器打开 schedule-jobs 文件夹中的 pom.xml 文件,并在 dependencies 节点中添加以下依赖项 。 通过此依赖项可以使用应用中的 iot-service-client 包来与 IoT 中心通信:
<dependency> <groupId>com.microsoft.azure.sdk.iot</groupId> <artifactId>iot-service-client</artifactId> <version>1.17.1</version> <type>jar</type> </dependency>
注意
可以使用 Maven 搜索检查是否有最新版本的 iot-service-client。
在 dependencies 节点后添加以下 build 节点。 此配置指示 Maven 使用 Java 1.8 来生成应用:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
保存并关闭 pom.xml 文件。
使用文本编辑器打开 schedule-jobs\src\main\java\com\mycompany\app\App.java 文件。
在该文件中添加以下 import 语句:
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinDevice; import com.microsoft.azure.sdk.iot.service.devicetwin.Pair; import com.microsoft.azure.sdk.iot.service.devicetwin.Query; import com.microsoft.azure.sdk.iot.service.devicetwin.SqlQuery; import com.microsoft.azure.sdk.iot.service.jobs.JobClient; import com.microsoft.azure.sdk.iot.service.jobs.JobResult; import com.microsoft.azure.sdk.iot.service.jobs.JobStatus; import java.util.Date; import java.time.Instant; import java.util.HashSet; import java.util.Set; import java.util.UUID;
将以下类级变量添加到 App 类。 将
{youriothubconnectionstring}
替换为以前在获取 IoT 中心连接字符串中复制的 IoT 中心连接字符串:public static final String iotHubConnectionString = "{youriothubconnectionstring}"; public static final String deviceId = "myDeviceId"; // How long the job is permitted to run without // completing its work on the set of devices private static final long maxExecutionTimeInSeconds = 30;
向 App 类添加以下方法,以安排作业更新设备孪生中的 Building 和 Floor 必需属性 :
private static JobResult scheduleJobSetDesiredProperties(JobClient jobClient, String jobId) { DeviceTwinDevice twin = new DeviceTwinDevice(deviceId); Set<Pair> desiredProperties = new HashSet<Pair>(); desiredProperties.add(new Pair("Building", 43)); desiredProperties.add(new Pair("Floor", 3)); twin.setDesiredProperties(desiredProperties); // Optimistic concurrency control twin.setETag("*"); // Schedule the update twin job to run now // against a single device System.out.println("Schedule job " + jobId + " for device " + deviceId); try { JobResult jobResult = jobClient.scheduleUpdateTwin(jobId, "deviceId='" + deviceId + "'", twin, new Date(), maxExecutionTimeInSeconds); return jobResult; } catch (Exception e) { System.out.println("Exception scheduling desired properties job: " + jobId); System.out.println(e.getMessage()); return null; } }
若要安排作业调用 lockDoor 方法,请向 App 类添加以下方法 :
private static JobResult scheduleJobCallDirectMethod(JobClient jobClient, String jobId) { // Schedule a job now to call the lockDoor direct method // against a single device. Response and connection // timeouts are set to 5 seconds. System.out.println("Schedule job " + jobId + " for device " + deviceId); try { JobResult jobResult = jobClient.scheduleDeviceMethod(jobId, "deviceId='" + deviceId + "'", "lockDoor", 5L, 5L, null, new Date(), maxExecutionTimeInSeconds); return jobResult; } catch (Exception e) { System.out.println("Exception scheduling direct method job: " + jobId); System.out.println(e.getMessage()); return null; } };
若要监视作业,请向 App 类添加以下方法:
private static void monitorJob(JobClient jobClient, String jobId) { try { JobResult jobResult = jobClient.getJob(jobId); if(jobResult == null) { System.out.println("No JobResult for: " + jobId); return; } // Check the job result until it's completed while(jobResult.getJobStatus() != JobStatus.completed) { Thread.sleep(100); jobResult = jobClient.getJob(jobId); System.out.println("Status " + jobResult.getJobStatus() + " for job " + jobId); } System.out.println("Final status " + jobResult.getJobStatus() + " for job " + jobId); } catch (Exception e) { System.out.println("Exception monitoring job: " + jobId); System.out.println(e.getMessage()); return; } }
若要查询已运行作业的详细信息,请添加以下方法:
private static void queryDeviceJobs(JobClient jobClient, String start) throws Exception { System.out.println("\nQuery device jobs since " + start); // Create a jobs query using the time the jobs started Query deviceJobQuery = jobClient .queryDeviceJob(SqlQuery.createSqlQuery("*", SqlQuery.FromType.JOBS, "devices.jobs.startTimeUtc > '" + start + "'", null).getQuery()); // Iterate over the list of jobs and print the details while (jobClient.hasNextJob(deviceJobQuery)) { System.out.println(jobClient.getNextJob(deviceJobQuery)); } }
更新 main 方法签名,以包含以下
throws
子句:public static void main( String[] args ) throws Exception
若要依次运行和监视两个作业,请将 main 方法中的代码替换为以下代码:
// Record the start time String start = Instant.now().toString(); // Create JobClient JobClient jobClient = JobClient.createFromConnectionString(iotHubConnectionString); System.out.println("JobClient created with success"); // Schedule twin job desired properties // Maximum concurrent jobs is 1 for Free and S1 tiers String desiredPropertiesJobId = "DPCMD" + UUID.randomUUID(); scheduleJobSetDesiredProperties(jobClient, desiredPropertiesJobId); monitorJob(jobClient, desiredPropertiesJobId); // Schedule twin job direct method String directMethodJobId = "DMCMD" + UUID.randomUUID(); scheduleJobCallDirectMethod(jobClient, directMethodJobId); monitorJob(jobClient, directMethodJobId); // Run a query to show the job detail queryDeviceJobs(jobClient, start); System.out.println("Shutting down schedule-jobs app");
保存并关闭 schedule-jobs\src\main\java\com\mycompany\app\App.java 文件
生成 schedule-jobs 应用并更正任何错误。 在命令提示符下,导航到 schedule-jobs 文件夹并运行以下命令:
mvn clean package -DskipTests
创建设备应用
本部分中将创建一个 Java 控制台应用,处理从 IoT 中心发送的必需属性并实现直接方法调用。
在命令提示符下使用以下命令,在 iot-java-schedule-jobs 文件夹中创建一个名为 simulated-device 的 Maven 项目。 请注意,这是一条很长的命令:
mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=simulated-device -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
在命令提示符下,导航到 simulated-device 文件夹。
使用文本编辑器,打开 simulated-device 文件夹中的 pom.xml 文件,并在 dependencies 节点中添加以下依赖项 。 通过此依赖项可以使用应用中的 iot-device-client 包来与 IoT 中心进行通信:
<dependency> <groupId>com.microsoft.azure.sdk.iot</groupId> <artifactId>iot-device-client</artifactId> <version>1.17.5</version> </dependency>
注意
可以使用 Maven 搜索检查是否有最新版本的 iot-device-client。
将以下依赖项添加到 dependencies 节点。 此依赖项为 Apache SLF4J 日志记录外观配置 NOP,设备客户端 SDK 使用它实现日志记录。 此配置是可选的,但是如果省略此配置,则在运行该应用时,可能会在控制台中看到警告。 有关设备客户端 SDK 中的日志记录的详细信息,请参阅适用于 Java 的 Azure IoT 设备 SDK 示例自述文件中的日志记录。
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.28</version> </dependency>
在 dependencies 节点后添加以下 build 节点。 此配置指示 Maven 使用 Java 1.8 来生成应用:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
保存并关闭 pom.xml 文件。
使用文本编辑器打开 simulated-device\src\main\java\com\mycompany\app\App.java 文件。
在该文件中添加以下 import 语句:
import com.microsoft.azure.sdk.iot.device.*; import com.microsoft.azure.sdk.iot.device.DeviceTwin.*; import java.io.IOException; import java.net.URISyntaxException; import java.util.Scanner;
将以下类级变量添加到 App 类。 将
{yourdeviceconnectionstring}
替换为在 IoT 中心注册设备时看到的设备连接字符串:private static String connString = "{yourdeviceconnectionstring}"; private static IotHubClientProtocol protocol = IotHubClientProtocol.MQTT; private static final int METHOD_SUCCESS = 200; private static final int METHOD_NOT_DEFINED = 404;
本示例应用在实例化 DeviceClient 对象时使用 protocol 变量。
若要在控制台中列显设备孪生通知,请向 App 类添加以下嵌套类:
// Handler for device twin operation notifications from IoT Hub protected static class DeviceTwinStatusCallBack implements IotHubEventCallback { public void execute(IotHubStatusCode status, Object context) { System.out.println("IoT Hub responded to device twin operation with status " + status.name()); } }
若要在控制台中列显直接方法通知,请向 App 类添加以下嵌套类:
// Handler for direct method notifications from IoT Hub protected static class DirectMethodStatusCallback implements IotHubEventCallback { public void execute(IotHubStatusCode status, Object context) { System.out.println("IoT Hub responded to direct method operation with status " + status.name()); } }
若要处理 IoT 中心的直接方法调用,请向 App 类添加以下嵌套类:
// Handler for direct method calls from IoT Hub protected static class DirectMethodCallback implements DeviceMethodCallback { @Override public DeviceMethodData call(String methodName, Object methodData, Object context) { DeviceMethodData deviceMethodData; switch (methodName) { case "lockDoor": { System.out.println("Executing direct method: " + methodName); deviceMethodData = new DeviceMethodData(METHOD_SUCCESS, "Executed direct method " + methodName); break; } default: { deviceMethodData = new DeviceMethodData(METHOD_NOT_DEFINED, "Not defined direct method " + methodName); } } // Notify IoT Hub of result return deviceMethodData; } }
更新 main 方法签名,以包含以下
throws
子句:public static void main( String[] args ) throws IOException, URISyntaxException
将 main 方法中的代码替换为以下代码,以便可:
- 创建用来与 IoT 中心通信的设备客户端。
- 创建一个 Device 对象用于存储设备孪生属性。
// Create a device client DeviceClient client = new DeviceClient(connString, protocol); // An object to manage device twin desired and reported properties Device dataCollector = new Device() { @Override public void PropertyCall(String propertyKey, Object propertyValue, Object context) { System.out.println("Received desired property change: " + propertyKey + " " + propertyValue); } };
若要启动设备客户端服务,请向 main 方法添加以下代码:
try { // Open the DeviceClient // Start the device twin services // Subscribe to direct method calls client.open(); client.startDeviceTwin(new DeviceTwinStatusCallBack(), null, dataCollector, null); client.subscribeToDeviceMethod(new DirectMethodCallback(), null, new DirectMethodStatusCallback(), null); } catch (Exception e) { System.out.println("Exception, shutting down \n" + " Cause: " + e.getCause() + " \n" + e.getMessage()); dataCollector.clean(); client.closeNow(); System.out.println("Shutting down..."); }
若要在关闭前等待用户按 Enter 键,请向 main 方法末尾添加以下代码 :
// Close the app System.out.println("Press any key to exit..."); Scanner scanner = new Scanner(System.in); scanner.nextLine(); dataCollector.clean(); client.closeNow(); scanner.close();
保存并关闭 simulated-device\src\main\java\com\mycompany\app\App.java 文件。
生成 simulated-device 应用并更正任何错误。 在命令提示符下,导航到 simulated-device 文件夹并运行以下命令:
mvn clean package -DskipTests
运行应用
现在可以运行控制台应用了。
在 simulated-device 文件夹中的命令提示符处,运行以下命令启动设备应用用于侦听所需属性更改和直接方法调用:
mvn exec:java -Dexec.mainClass="com.mycompany.app.App"
在
schedule-jobs
文件夹中的命令提示符处,运行以下命令以运行 schedule-jobs 服务应用,从而运行两个作业。 第一个作业设置所需的属性值,第二个作业调用直接方法:mvn exec:java -Dexec.mainClass="com.mycompany.app.App"
设备应用处理所需的属性更改和直接方法调用: