次の方法で共有

调度和发布作业(Java)

使用 Azure IoT 中心来计划和跟踪更新数百万台设备的作业。 使用作业可以:

  • 更新所需属性
  • 更新标记
  • 调用直接方法

作业封装这些操作中的一个,并跟踪其在一组设备上的执行进度。 设备孪生查询定义作业所针对的设备集。 例如,后端应用可以使用作业在 10,000 台设备上调用用来重新启动设备的直接方法。 使用设备孪生查询语句指定设备集,并计划任务在未来的时间运行。 作业跟踪每个设备接收和执行重新启动直接方法的进度。

若要详细了解每项功能,请参阅:

注释

本文中所述的功能仅在 IoT 中心的标准层中可用。 有关基本层和标准/免费 IoT 中心层的详细信息,请参阅 为解决方案选择正确的 IoT 中心层和大小

本文介绍如何创建两个 Java 应用:

  • 一个设备应用 ,模拟设备,它实现一个名为 lockDoor 的直接方法,该方法可由后端应用调用。

  • 一个后端应用, 计划作业,用于创建两个作业。 一个作业调用 lockDoor 直接方法,另一个作业将所需的属性更新发送到多个设备。

注释

有关可用于生成设备和后端应用的 SDK 工具的详细信息,请参阅 Azure IoT SDK

先决条件

  • IoT 中心。 使用 CLIAzure 门户创建一个。

  • 已注册的设备。 在 Azure 门户中注册一个。

  • Java SE 开发工具包 8。 请确保在长期支持下选择 Java 8,以获取 JDK 8 的下载。

  • Maven 3

  • 确保已在防火墙中打开端口 8883。 本文中的设备示例使用通过端口 8883 进行通信的 MQTT 协议。 某些企业和教育网络环境中可能会阻止此端口。 有关解决此问题的详细信息和方法,请参阅“连接到 IoT 中心”(MQTT)。

注释

为简单起见,本文不实现重试策略。 在生产代码中,应实施重试策略(如指数退避),如暂时 性故障处理一文中所述。

获取 IoT 中心连接字符串

在本文中,你将创建一个后端服务,该服务计划作业以在设备上调用直接方法,计划作业以更新设备孪生,并监视每个作业的进度。 若要执行这些作,服务需要 注册表读取注册表写入 权限。 默认情况下,每个 IoT 中心都是使用名为 registryReadWrite 的共享访问策略创建的,该策略授予这些权限。

若要获取 registryReadWrite 策略的 IoT 中心连接字符串,请执行以下步骤:

  1. Azure 门户中,选择 资源组。 选择中心所在的资源组,然后从资源列表中选择中心。

  2. 在中心的左侧窗格中,选择 “共享访问策略”。

  3. 从策略列表中,选择 registryReadWrite 策略。

  4. 复制“主连接字符串”并保存该值

    显示如何检索连接字符串的屏幕截图

有关 IoT 中心共享访问策略和权限的详细信息,请参阅 访问控制和权限

创建服务应用

在本部分中,你将创建一个 Java 控制台应用,该应用使用作业来:

  • 在多个设备上调用 lockDoor 直接方法。

  • 将所需属性发送到多个设备。

创建应用:

  1. 在开发计算机上,创建名为 iot-java-schedule-jobs 的空文件夹。

  2. iot-java-schedule-jobs 文件夹中,在命令提示符处使用以下命令创建名为 schedule-jobs 的 Maven 项目。 请注意,这是一个单长命令:

    mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=schedule-jobs -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
    
  3. 在命令提示符下,导航到 schedule-jobs 文件夹。

  4. 使用文本编辑器打开 schedule-jobs 文件夹中的pom.xml 文件,并将以下依赖项添加到依赖项节点。 通过此依赖项,可以使用应用中的 iot-service-client 包与 IoT 中心通信:

    <dependency>
      <groupId>com.microsoft.azure.sdk.iot</groupId>
      <artifactId>iot-service-client</artifactId>
      <version>1.17.1</version>
      <type>jar</type>
    </dependency>
    

    注释

    可以使用 Maven 搜索检查最新版本的 iot-service-client

  5. 依赖项节点之后添加以下生成节点。 此配置指示 Maven 使用 Java 1.8 生成应用:

    <build>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.3</version>
          <configuration>
            <source>1.8</source>
            <target>1.8</target>
          </configuration>
        </plugin>
      </plugins>
    </build>
    
  6. 保存并关闭 pom.xml 文件。

  7. 使用文本编辑器打开 schedule-jobs\src\main\java\com\mycompany\app\App.java 文件。

  8. 将以下 import 语句添加到该文件:

    import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinDevice;
    import com.microsoft.azure.sdk.iot.service.devicetwin.Pair;
    import com.microsoft.azure.sdk.iot.service.devicetwin.Query;
    import com.microsoft.azure.sdk.iot.service.devicetwin.SqlQuery;
    import com.microsoft.azure.sdk.iot.service.jobs.JobClient;
    import com.microsoft.azure.sdk.iot.service.jobs.JobResult;
    import com.microsoft.azure.sdk.iot.service.jobs.JobStatus;
    
    import java.util.Date;
    import java.time.Instant;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.UUID;
    
  9. 将以下类级变量添加到 App 类。 将{youriothubconnectionstring}替换为之前在获取 IoT Hub 连接字符串中复制的 IoT Hub 连接字符串:

    public static final String iotHubConnectionString = "{youriothubconnectionstring}";
    public static final String deviceId = "myDeviceId";
    
    // How long the job is permitted to run without
    // completing its work on the set of devices
    private static final long maxExecutionTimeInSeconds = 30;
    
  10. 将以下方法添加到 App 类,以计划作业以更新设备孪生中的 BuildingFloor 所需属性:

    private static JobResult scheduleJobSetDesiredProperties(JobClient jobClient, String jobId) {
      DeviceTwinDevice twin = new DeviceTwinDevice(deviceId);
      Set<Pair> desiredProperties = new HashSet<Pair>();
      desiredProperties.add(new Pair("Building", 43));
      desiredProperties.add(new Pair("Floor", 3));
      twin.setDesiredProperties(desiredProperties);
      // Optimistic concurrency control
      twin.setETag("*");
    
      // Schedule the update twin job to run now
      // against a single device
      System.out.println("Schedule job " + jobId + " for device " + deviceId);
      try {
        JobResult jobResult = jobClient.scheduleUpdateTwin(jobId, 
          "deviceId='" + deviceId + "'",
          twin,
          new Date(),
          maxExecutionTimeInSeconds);
        return jobResult;
      } catch (Exception e) {
        System.out.println("Exception scheduling desired properties job: " + jobId);
        System.out.println(e.getMessage());
        return null;
      }
    }
    
  11. 若要计划作业以调用 lockDoor 方法,请将以下方法添加到 App 类:

    private static JobResult scheduleJobCallDirectMethod(JobClient jobClient, String jobId) {
      // Schedule a job now to call the lockDoor direct method
      // against a single device. Response and connection
      // timeouts are set to 5 seconds.
      System.out.println("Schedule job " + jobId + " for device " + deviceId);
      try {
        JobResult jobResult = jobClient.scheduleDeviceMethod(jobId,
          "deviceId='" + deviceId + "'",
          "lockDoor",
          5L, 5L, null,
          new Date(),
          maxExecutionTimeInSeconds);
        return jobResult;
      } catch (Exception e) {
        System.out.println("Exception scheduling direct method job: " + jobId);
        System.out.println(e.getMessage());
        return null;
      }
    };
    
  12. 若要监视作业,请将以下方法添加到 App 类:

    private static void monitorJob(JobClient jobClient, String jobId) {
      try {
        JobResult jobResult = jobClient.getJob(jobId);
        if(jobResult == null)
        {
          System.out.println("No JobResult for: " + jobId);
          return;
        }
        // Check the job result until it's completed
        while(jobResult.getJobStatus() != JobStatus.completed)
        {
          Thread.sleep(100);
          jobResult = jobClient.getJob(jobId);
          System.out.println("Status " + jobResult.getJobStatus() + " for job " + jobId);
        }
        System.out.println("Final status " + jobResult.getJobStatus() + " for job " + jobId);
      } catch (Exception e) {
        System.out.println("Exception monitoring job: " + jobId);
        System.out.println(e.getMessage());
        return;
      }
    }
    
  13. 若要查询运行作业的详细信息,请添加以下方法:

    private static void queryDeviceJobs(JobClient jobClient, String start) throws Exception {
      System.out.println("\nQuery device jobs since " + start);
    
      // Create a jobs query using the time the jobs started
      Query deviceJobQuery = jobClient
          .queryDeviceJob(SqlQuery.createSqlQuery("*", SqlQuery.FromType.JOBS, "devices.jobs.startTimeUtc > '" + start + "'", null).getQuery());
    
      // Iterate over the list of jobs and print the details
      while (jobClient.hasNextJob(deviceJobQuery)) {
        System.out.println(jobClient.getNextJob(deviceJobQuery));
      }
    }
    
  14. 更新 main 方法签名以包括以下 throws 子句:

    public static void main( String[] args ) throws Exception
    
  15. 若要按顺序运行和监视两个作业,请将 main 方法中的代码替换为以下代码:

    // Record the start time
    String start = Instant.now().toString();
    
    // Create JobClient
    JobClient jobClient = JobClient.createFromConnectionString(iotHubConnectionString);
    System.out.println("JobClient created with success");
    
    // Schedule twin job desired properties
    // Maximum concurrent jobs is 1 for Free and S1 tiers
    String desiredPropertiesJobId = "DPCMD" + UUID.randomUUID();
    scheduleJobSetDesiredProperties(jobClient, desiredPropertiesJobId);
    monitorJob(jobClient, desiredPropertiesJobId);
    
    // Schedule twin job direct method
    String directMethodJobId = "DMCMD" + UUID.randomUUID();
    scheduleJobCallDirectMethod(jobClient, directMethodJobId);
    monitorJob(jobClient, directMethodJobId);
    
    // Run a query to show the job detail
    queryDeviceJobs(jobClient, start);
    
    System.out.println("Shutting down schedule-jobs app");
    
  16. 保存并关闭 schedule-jobs\src\main\java\com\mycompany\app\App.java 文件

  17. 生成 计划作业 应用并更正任何错误。 在命令提示符下,导航到 计划作业 文件夹并运行以下命令:

    mvn clean package -DskipTests
    

创建设备应用

在本部分中,你将创建一个 Java 控制台应用,用于处理从 IoT 中心发送的所需属性并实现直接方法调用。

  1. iot-java-schedule-jobs 文件夹中,在命令提示符处使用以下命令创建名为 simulated-device 的 Maven 项目。 请注意,这是一个单长命令:

    mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=simulated-device -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
    
  2. 在命令提示符下,导航到 simulated-device 文件夹。

  3. 使用文本编辑器打开 simulated-device 文件夹中的pom.xml 文件,并将以下依赖项添加到依赖项节点。 通过此依赖项,可以使用应用中的 iot-device-client 包与 IoT 中心通信:

    <dependency>
      <groupId>com.microsoft.azure.sdk.iot</groupId>
      <artifactId>iot-device-client</artifactId>
      <version>1.17.5</version>
    </dependency>
    

    注释

    可以使用 Maven 搜索检查最新版本的 iot-device-client

  4. 将以下依赖项添加到 依赖项 节点。 此依赖项将 Apache SLF4J 日志记录外观配置为 NOP,设备客户端 SDK 使用它来进行日志记录。 此配置是可选的,但如果省略该配置,则运行应用时可能会在控制台中看到警告。 有关设备客户端 SDK 中日志记录的详细信息,请参阅适用于 Java 自述文件的 Azure IoT 设备 SDK 示例中的日志记录

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-nop</artifactId>
      <version>1.7.28</version>
    </dependency>
    
  5. 依赖项节点之后添加以下构建节点。 此配置指示 Maven 使用 Java 1.8 生成应用:

    <build>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.3</version>
          <configuration>
            <source>1.8</source>
            <target>1.8</target>
          </configuration>
        </plugin>
      </plugins>
    </build>
    
  6. 保存并关闭 pom.xml 文件。

  7. 使用文本编辑器打开 simulated-device\src\main\java\com\mycompany\app\App.java 文件。

  8. 将以下 import 语句添加到该文件:

    import com.microsoft.azure.sdk.iot.device.*;
    import com.microsoft.azure.sdk.iot.device.DeviceTwin.*;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.util.Scanner;
    
  9. 将以下类级变量添加到 App 类。 替换为 {yourdeviceconnectionstring} 在 IoT 中心注册设备时看到的设备连接字符串:

    private static String connString = "{yourdeviceconnectionstring}";
    private static IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
    private static final int METHOD_SUCCESS = 200;
    private static final int METHOD_NOT_DEFINED = 404;
    

    此示例应用在实例化 DeviceClient 对象时使用协议变量。

  10. 若要将设备孪生通知打印到控制台,请将以下嵌套类添加到 App 类:

    // Handler for device twin operation notifications from IoT Hub
    protected static class DeviceTwinStatusCallBack implements IotHubEventCallback {
      public void execute(IotHubStatusCode status, Object context) {
        System.out.println("IoT Hub responded to device twin operation with status " + status.name());
      }
    }
    
  11. 若要将直接方法通知打印到控制台,请将以下嵌套类添加到 App 类:

    // Handler for direct method notifications from IoT Hub
    protected static class DirectMethodStatusCallback implements IotHubEventCallback {
      public void execute(IotHubStatusCode status, Object context) {
        System.out.println("IoT Hub responded to direct method operation with status " + status.name());
      }
    }
    
  12. 若要处理来自 IoT 中心的直接方法调用,请将以下嵌套类添加到 App 类:

    // Handler for direct method calls from IoT Hub
    protected static class DirectMethodCallback
        implements DeviceMethodCallback {
      @Override
      public DeviceMethodData call(String methodName, Object methodData, Object context) {
        DeviceMethodData deviceMethodData;
        switch (methodName) {
          case "lockDoor": {
            System.out.println("Executing direct method: " + methodName);
            deviceMethodData = new DeviceMethodData(METHOD_SUCCESS, "Executed direct method " + methodName);
            break;
          }
          default: {
            deviceMethodData = new DeviceMethodData(METHOD_NOT_DEFINED, "Not defined direct method " + methodName);
          }
        }
        // Notify IoT Hub of result
        return deviceMethodData;
      }
    }
    
  13. 更新 main 方法签名以包括以下 throws 子句:

    public static void main( String[] args ) throws IOException, URISyntaxException
    
  14. main 方法中的代码替换为以下代码:

    • 创建设备客户端以与 IoT 中心通信。
    • 创建 Device 对象以存储设备孪生属性。
    // Create a device client
    DeviceClient client = new DeviceClient(connString, protocol);
    
    // An object to manage device twin desired and reported properties
    Device dataCollector = new Device() {
      @Override
      public void PropertyCall(String propertyKey, Object propertyValue, Object context)
      {
        System.out.println("Received desired property change: " + propertyKey + " " + propertyValue);
      }
    };
    
  15. 若要启动设备客户端服务,请将以下代码添加到 main 方法:

    try {
      // Open the DeviceClient
      // Start the device twin services
      // Subscribe to direct method calls
      client.open();
      client.startDeviceTwin(new DeviceTwinStatusCallBack(), null, dataCollector, null);
      client.subscribeToDeviceMethod(new DirectMethodCallback(), null, new DirectMethodStatusCallback(), null);
    } catch (Exception e) {
      System.out.println("Exception, shutting down \n" + " Cause: " + e.getCause() + " \n" + e.getMessage());
      dataCollector.clean();
      client.closeNow();
      System.out.println("Shutting down...");
    }
    
  16. 若要等待用户在关闭前按 Enter 键,请将以下代码添加到 main 方法的末尾:

    // Close the app
    System.out.println("Press any key to exit...");
    Scanner scanner = new Scanner(System.in);
    scanner.nextLine();
    dataCollector.clean();
    client.closeNow();
    scanner.close();
    
  17. 保存并关闭 simulated-device\src\main\java\com\mycompany\app\App.java 文件。

  18. 生成 模拟设备 应用并更正任何错误。 在命令提示符下,导航到 simulated-device 文件夹并运行以下命令:

    mvn clean package -DskipTests
    

运行应用

现在可以运行控制台应用了。

  1. simulated-device 文件夹中的命令提示符下,运行以下命令以启动侦听所需属性更改和直接方法调用的设备应用:

    mvn exec:java -Dexec.mainClass="com.mycompany.app.App"
    

    设备客户端启动

  2. 在文件夹中的 schedule-jobs 命令提示符下,运行以下命令以运行 计划作业 服务应用以运行两个作业。 第一个设置所需的属性值,第二个调用直接方法:

    mvn exec:java -Dexec.mainClass="com.mycompany.app.App"
    

    Java IoT 中心服务应用程序创建了两个作业

  3. 设备应用处理所需的属性更改和直接方法调用:

    设备客户端响应更改