|
@@ -0,0 +1,267 @@
|
|
|
+package com.fjhx.utils;
|
|
|
+
|
|
|
+import com.fjhx.MyMain;
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+
|
|
|
+import javax.crypto.Mac;
|
|
|
+import javax.crypto.spec.SecretKeySpec;
|
|
|
+import javax.net.SocketFactory;
|
|
|
+import javax.net.ssl.SSLContext;
|
|
|
+import javax.net.ssl.TrustManager;
|
|
|
+import javax.net.ssl.TrustManagerFactory;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.security.KeyStore;
|
|
|
+import java.security.SecureRandom;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.ZoneId;
|
|
|
+import java.time.ZonedDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+
|
|
|
+public class MqttClient {
|
|
|
+
|
|
|
+ private final int qosLevel = 1;
|
|
|
+ private MqttAsyncClient client;
|
|
|
+
|
|
|
+ private boolean connectFlag = false;
|
|
|
+
|
|
|
+ // mqtts加密连接
|
|
|
+ private final boolean isSSL = true;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * mqtt建链
|
|
|
+ */
|
|
|
+ public void connect() {
|
|
|
+
|
|
|
+ String url;
|
|
|
+ if (isSSL) {
|
|
|
+ url = "ssl://" + MyMain.config.getServerIp() + ":" + 8883; //mqtts连接
|
|
|
+ } else {
|
|
|
+ url = "tcp://" + MyMain.config.getServerIp() + ":" + 1883; //mqtt连接
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ MqttConnectOptions options = new MqttConnectOptions();
|
|
|
+ if (isSSL) {
|
|
|
+ options.setSocketFactory(getOptionSocketFactory(MyUtil.dir + "\\ca.jks"));
|
|
|
+ options.setHttpsHostnameVerificationEnabled(false);
|
|
|
+ }
|
|
|
+ options.setCleanSession(false);
|
|
|
+ options.setKeepAliveInterval(120);
|
|
|
+ options.setConnectionTimeout(5000);
|
|
|
+ options.setAutomaticReconnect(true);
|
|
|
+ options.setUserName(MyMain.config.getDeviceId());
|
|
|
+ options.setPassword(getPassword().toCharArray());
|
|
|
+
|
|
|
+ //设置MqttClient
|
|
|
+ client = new MqttAsyncClient(url, getClientId(), new MemoryPersistence());
|
|
|
+ client.setCallback(callback);
|
|
|
+ //建立连接
|
|
|
+ client.connect(options, null, new IMqttActionListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onSuccess(IMqttToken iMqttToken) {
|
|
|
+ connectFlag = true;
|
|
|
+
|
|
|
+ MyUtil.infoLog("连接成功");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
|
|
|
+ MyUtil.errorLog("iot连接失败,正在尝试重新连接");
|
|
|
+
|
|
|
+ MyUtil.sleep(5 * 1000);
|
|
|
+ MqttClient.this.connect();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+
|
|
|
+ while (!connectFlag) {
|
|
|
+ MyUtil.sleep(500L);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 上报json数据,注意serviceId要与Profile中的定义对应
|
|
|
+ */
|
|
|
+ public void publishMessage(String jsonMsg) {
|
|
|
+ MqttMessage message = new MqttMessage(jsonMsg.getBytes());
|
|
|
+ try {
|
|
|
+ client.publish(getReportTopic(), message, qosLevel, new IMqttActionListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onSuccess(IMqttToken iMqttToken) {
|
|
|
+// publishMessageCallback.onSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
|
|
|
+// publishMessageCallback.onFailure();
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+ } catch (MqttException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Mqtt回调
|
|
|
+ */
|
|
|
+ public MqttCallback callback = new MqttCallbackExtended() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void connectComplete(boolean reconnect, String serviceURI) {
|
|
|
+ subScribeTopic();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable throwable) {
|
|
|
+ MyUtil.errorLog("Connection lost.");
|
|
|
+ //可在此处实现重连
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) {
|
|
|
+ MyUtil.errorLog("Receive mqtt topic:" + topic + ", message:" + message);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅接收命令topic
|
|
|
+ */
|
|
|
+ public void subScribeTopic() {
|
|
|
+ try {
|
|
|
+ client.subscribe(getCmdRequestTopic(), qosLevel, null, new IMqttActionListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onSuccess(IMqttToken iMqttToken) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
|
|
|
+ MyUtil.errorLog("Subscribe mqtt topic fail");
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+ } catch (MqttException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 属性上报topic
|
|
|
+ */
|
|
|
+ private String getReportTopic() {
|
|
|
+ return "$oc/devices/" + MyMain.config.getDeviceId() + "/sys/properties/report";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅命令下发topic
|
|
|
+ */
|
|
|
+ private String getCmdRequestTopic() {
|
|
|
+ return "$oc/devices/" + MyMain.config.getDeviceId() + "/sys/commands/#";
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 加载SSL证书
|
|
|
+ *
|
|
|
+ * @param certPath 证书存放的相对路径
|
|
|
+ */
|
|
|
+ private SocketFactory getOptionSocketFactory(String certPath) {
|
|
|
+ SSLContext sslContext;
|
|
|
+
|
|
|
+ InputStream stream = null;
|
|
|
+ try {
|
|
|
+ stream = new FileInputStream(certPath);
|
|
|
+ sslContext = SSLContext.getInstance("TLS");
|
|
|
+ KeyStore ts = KeyStore.getInstance("JKS");
|
|
|
+ ts.load(stream, null);
|
|
|
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
|
|
|
+ tmf.init(ts);
|
|
|
+ TrustManager[] tm = tmf.getTrustManagers();
|
|
|
+ sslContext.init(null, tm, new SecureRandom());
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return null;
|
|
|
+ } finally {
|
|
|
+ if (stream != null) {
|
|
|
+ try {
|
|
|
+ stream.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return sslContext.getSocketFactory();
|
|
|
+ }
|
|
|
+
|
|
|
+ /***
|
|
|
+ * 调用sha256算法进行哈希
|
|
|
+ */
|
|
|
+ private String sha256_mac(String message, String tStamp) {
|
|
|
+ String passWord = null;
|
|
|
+ try {
|
|
|
+ Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
|
|
|
+ SecretKeySpec secret_key = new SecretKeySpec(tStamp.getBytes(), "HmacSHA256");
|
|
|
+ sha256_HMAC.init(secret_key);
|
|
|
+ byte[] bytes = sha256_HMAC.doFinal(message.getBytes());
|
|
|
+ passWord = byteArrayToHexString(bytes);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return passWord;
|
|
|
+ }
|
|
|
+
|
|
|
+ /***
|
|
|
+ * byte数组转16进制字符串
|
|
|
+ */
|
|
|
+ private String byteArrayToHexString(byte[] b) {
|
|
|
+ StringBuilder hs = new StringBuilder();
|
|
|
+ String stmp;
|
|
|
+ for (int n = 0; b != null && n < b.length; n++) {
|
|
|
+ stmp = Integer.toHexString(b[n] & 0XFF);
|
|
|
+ if (stmp.length() == 1) {
|
|
|
+ hs.append('0');
|
|
|
+ }
|
|
|
+ hs.append(stmp);
|
|
|
+ }
|
|
|
+ return hs.toString().toLowerCase();
|
|
|
+ }
|
|
|
+
|
|
|
+ /***
|
|
|
+ * 要求:10位数字
|
|
|
+ */
|
|
|
+ private String getTimeStamp() {
|
|
|
+ return ZonedDateTime.ofInstant(Instant.now(), ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getClientId() {
|
|
|
+ return MyMain.config.getDeviceId() + "_0_0_" + getTimeStamp();
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getPassword() {
|
|
|
+ return sha256_mac(MyMain.config.getSecret(), getTimeStamp());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void close() {
|
|
|
+ try {
|
|
|
+ client.disconnect();
|
|
|
+ client.close();
|
|
|
+ } catch (MqttException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|