如何使用 paho mqttv3 库发送消息到 Azure IoT 中心
Azure IoT 中心支持多种协议,比如 AMQP, MQTT 等等,也支持多种语言,比如 Java, C 等等。这意味着,除了使用 Azure 官方提供的 IoT 中心 SDK 之外,还可以使用社区中一些常用的库来与 Azure IoT 中心进行交互。这篇文章将演示如何使用 paho mqttv3 库发送消息到 Azure IoT 中心,以及如何利用 property_bag 发送包含其他属性的消息。
添加引用
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<!-- for base64 decode and encode -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
生成 SAS 令牌
SAS 令牌将作为基于 MQTT 协议连接 Azure IoT 中心时使用的密码。
private static String generateSasToken(String serverUri, String key) throws UnsupportedEncodingException {
String tokenFormat = "SharedAccessSignature sig=%s&se=%s&sr=%s";
String scope = URLEncoder.encode(serverUri, StandardCharsets.UTF_8.toString());
long expiryTime = System.currentTimeMillis() / 1000l + 600 + 1l;
String signatureStr = generateSignature(scope, expiryTime, key);
return String.format(tokenFormat, signatureStr, expiryTime, scope);
}
private static String generateSignature(String resrouceUri, long expiryTime, String deviceKey) throws UnsupportedEncodingException
{
byte[] rawSig = String.format("%s\n%s", resrouceUri, expiryTime).getBytes(StandardCharsets.UTF_8);
byte[] decodedDeviceKey = Base64.decodeBase64(deviceKey.getBytes());
String hmacSha256 = "HmacSHA256";
SecretKeySpec secretKey = new SecretKeySpec(decodedDeviceKey, hmacSha256);
byte[] encryptedSig = null;
try
{
Mac hMacSha256 = Mac.getInstance(hmacSha256);
hMacSha256.init(secretKey);
encryptedSig = hMacSha256.doFinal(rawSig);
}
catch (NoSuchAlgorithmException e)
{
// should never happen, since the algorithm is hard-coded
}
catch (InvalidKeyException e)
{
// should never happen, since the algorithm is hard-coded
}
byte[] encryptedSigBase64 = Base64.encodeBase64(encryptedSig);
String utf8Sig = new String(encryptedSigBase64, StandardCharsets.UTF_8);
String signatureStr = URLEncoder.encode(utf8Sig, StandardCharsets.UTF_8.name());
return signatureStr;
}
连接 Azure IoT 中心
String hostName = ""; // e.g. iothubdemo.azure-devices.cn
String deviceId = "";
String deviceKey = "";
MqttAsyncClient client = new MqttAsyncClient(serverURI, deviceId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(hostName + "/" + deviceId);
connOpts.setPassword(generateSasToken(hostName + "/devices/" + deviceId, deviceKey).toCharArray());
connOpts.setMaxInflight(100);
connOpts.setCleanSession(false);
connOpts.setAutomaticReconnect(true);
IMqttToken conToken = client.connect(connOpts);
conToken.waitForCompletion();
System.out.println("Connected to IOT Hub server " + hostName);
发送消息
String msgStr = "message " + i;
MqttMessage message = new MqttMessage(msgStr.getBytes());
message.setQos(1);
IMqttDeliveryToken token = client.publish(topic, message);
// wait here will make everything serialization
token.waitForCompletion();
System.out.println("Sent message " + i);
利用 property_bag 发送包含其他属性的消息
添加 property_bag 到主题中
// demo for topic name with property_bag: devices/{device_id}/messages/events/{property_bag}
String topicProperty = URLEncoder.encode("testp", "UTF-8") + "=" + URLEncoder.encode("testv", "UTF-8");
String topicWithProperty = "devices/" + deviceId + "/messages/events/" + topicProperty;
发送消息到带有 property_bag 的主题
// demo for topic name with property_bag
// in this way, all events will contains the property
IMqttDeliveryToken delToken = client.publish(topicWithProperty, message);
资源链接
示例代码:https://github.com/MicrosoftDocs/aog_codesample/tree/master/IotHub/Java/nature-mqtt-sample/