物聯網mqtt協議是可以發佈和訂閱通過java就可以實現 話不多說,mqtt,載入pom.xml文件格式 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema- ...
物聯網mqtt協議是可以發佈和訂閱通過java就可以實現
話不多說,mqtt,載入pom.xml文件格式
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>com.mqtt</groupId> 4 <artifactId>MQTTmosquitto</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 <dependencies> 7 <!-- MQTT依賴 --> 8 <dependency> 9 <groupId>org.eclipse.paho</groupId> 10 <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 11 <version>1.0.2</version> 12 </dependency> 13 <!-- JDBC --> 14 <dependency> 15 <groupId>mysql</groupId> 16 <artifactId>mysql-connector-java</artifactId> 17 <version>5.1.39</version> 18 </dependency> 19 20 <!-- 阿裡巴巴JSON --> 21 <dependency> 22 <groupId>com.alibaba</groupId> 23 <artifactId>fastjson</artifactId> 24 <version>1.2.28</version> 25 </dependency> 26 27 28 </dependencies> 29 30 <build> 31 <plugins> 32 <plugin> 33 <artifactId>maven-compiler-plugin</artifactId> 34 <version>2.3.2</version> 35 <configuration> 36 <source>1.8</source> 37 <target>1.8</target> 38 </configuration> 39 </plugin> 40 <plugin> 41 <artifactId> maven-assembly-plugin </artifactId> 42 <configuration> 43 <descriptorRefs> 44 <descriptorRef>MQTT</descriptorRef> 45 </descriptorRefs> 46 <archive> 47 <manifest> 48 <mainClass>com.mqtt.ServerMQTT</mainClass> 49 </manifest> 50 </archive> 51 </configuration> 52 <executions> 53 <execution> 54 <id>make-assembly</id> 55 <phase>package</phase> 56 <goals> 57 <goal>single</goal> 58 </goals> 59 </execution> 60 </executions> 61 </plugin> 62 </plugins> 63 </build> 64 65 </project>
這塊是發佈信息代碼
1 package com.mqtt; 2 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttClient; 6 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 7 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 8 import org.eclipse.paho.client.mqttv3.MqttException; 9 import org.eclipse.paho.client.mqttv3.MqttMessage; 10 import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 11 import org.eclipse.paho.client.mqttv3.MqttTopic; 12 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 13 14 public class ServerMQTT { 15 16 public static void main(String[] args) throws MqttException { 17 ServerMQTT server = new ServerMQTT(); 18 19 server.message = new MqttMessage(); 20 server.message.setQos(2); 21 server.message.setRetained(true); 22 //編輯消息 23 server.message.setPayload("你的肉".getBytes()); 24 server.publish(server.topic , server.message); 25 System.out.println(server.message.isRetained() + "------ratained狀態"); 26 } 27 28 //MQTT安裝的伺服器地址和埠號 29 public static final String HOST = "tcp://localhost:1883"; 30 //定義一個主題 31 public static final String TOPIC = "test"; 32 //定義MQTT的ID,可以在MQTT服務配置中指定 33 private static final String clientid = "client-1"; 34 35 private MqttClient client; 36 private MqttTopic topic;
//用戶和密碼 37 private String userName = "zhny"; 38 private String passWord = "zhny2020"; 39 private MqttMessage message; 40 41 /** 42 * g構造函數 43 */ 44 public ServerMQTT() throws MqttException { 45 // MemoryPersistence設置clientid的保存形式,預設為以記憶體保存 46 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 47 connect(); 48 } 49 50 /** 51 * l連接伺服器 52 */ 53 private void connect() { 54 MqttConnectOptions options = new MqttConnectOptions(); 55 options.setCleanSession(false); 56 options.setUserName(userName); 57 options.setPassword(passWord.toCharArray()); 58 // 設置超時時間 59 options.setConnectionTimeout(10); 60 // 設置會話心跳時間 61 options.setKeepAliveInterval(20); 62 try { 63 client.setCallback(new MqttCallback() { 64 public void connectionLost(Throwable cause) { 65 // 連接丟失後,一般在這裡面進行重連 66 System.out.println("連接斷開……(可以做重連)"); 67 } 68 69 public void deliveryComplete(IMqttDeliveryToken token) { 70 System.out.println("deliveryComplete---------" + token.isComplete()); 71 } 72 73 public void messageArrived(String topic, MqttMessage message) throws Exception { 74 // subscribe後得到的消息會執行到這裡面 75 System.out.println("接收消息主題:" + topic + " 接收消息Qos:" + message.getQos() + "接收消息內容:" + new String(message.getPayload())); 76 } 77 }); 78 client.connect(options); 79 80 topic = client.getTopic(TOPIC); 81 } catch (Exception e) { 82 e.printStackTrace(); 83 } 84 } 85 86 /** 87 * t推送消息 88 */ 89 public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { 90 MqttDeliveryToken token = topic.publish(message); 91 token.waitForCompletion(); 92 System.out.println("測試成功為true失敗為false " + token.isComplete()); 93 } 94 95 96 97 }
訂閱接收代碼
1 package com.mqtt; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttException; 6 import org.eclipse.paho.client.mqttv3.MqttTopic; 7 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 8 9 /** 10 * MQTT工具類操作 11 * 12 */ 13 public class MQTTConnect { 14 15 16 public static void main(String[] args) throws MqttException { 17 MQTTConnect client = new MQTTConnect(); 18 client.start(); 19 } 20 21 22 23 24 25 //MQTT安裝的伺服器地址和埠號(本機的ip) 26 public static final String HOST = "tcp://localhost:1883"; 27 //定義一個主題 28 public static final String TOPIC = "test"; 29 //定義MQTT的ID,可以在MQTT服務配置中指定 30 private static final String clientid = "client-2"; 31 private MqttClient client; 32 private MqttConnectOptions options;
//用戶和密碼 33 private String userName = "zhny"; 34 private String passWord = "zhny2020"; 35 36 // private ScheduledExecutorService scheduler; 37 38 public void start() { 39 try { 40 // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,預設為以記憶體保存 41 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 42 // MQTT的連接設置 43 options = new MqttConnectOptions(); 44 // 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連接記錄,這裡設置為true表示每次連接到伺服器都以新的身份連接 45 options.setCleanSession(true); 46 // 設置連接的用戶名 47 options.setUserName(userName); 48 // 設置連接的密碼 49 options.setPassword(passWord.toCharArray()); 50 // 設置超時時間 單位為秒 51 options.setConnectionTimeout(10); 52 // 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否線上,但這個方法並沒有重連的機制 53 options.setKeepAliveInterval(20); 54 55 // 設置回調,client.setCallback就可以調用PushCallback類中的messageArrived()方法 56 client.setCallback(new PushCallback()); 57 MqttTopic topic = client.getTopic(TOPIC); 58 59 int qos = 2; 60 61 //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終埠的通知消息 62 //options.setWill(topic, "This is yizhu...".getBytes(), qos, true); 63 client.connect(options); 64 //訂閱消息 65 int[] Qos = {qos}; 66 String[] topic1 = {TOPIC}; 67 client.subscribe(topic1, Qos); 68 69 } catch (Exception e) { 70 e.printStackTrace(); 71 } 72 } 73 74 } 75 76 77 78 79 80 81 82
1 package com.mqtt; 2 3 4 5 6 7 import java.text.SimpleDateFormat; 8 9 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 10 import org.eclipse.paho.client.mqttv3.MqttCallback; 11 import org.eclipse.paho.client.mqttv3.MqttMessage; 12 13 import com.alibaba.fastjson.JSONObject; 14 import com.jdbc.MySQLDemo; 15 16 17 18 /** 19 * 必須實現MqttCallback的介面並實現對應的相關介面方法CallBack 類將實現 MqttCallBack。 20 * 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。 21 * 在回調中,將它用來標識已經啟動了該回調的哪個實例。 22 * 必須在回調類中實現三個方法: 23 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發佈。 24 * public void connectionLost(Throwable cause)在斷開連接時調用。 25 * public void deliveryComplete(MqttDeliveryToken token)) 26 * 接收到已經發佈的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。 27 * 由 MqttClient.connect 激活此回調。 28 */ 29 public class PushCallback implements MqttCallback{ 30 31 32 public void connectionLost(Throwable cause) { 33 // 連接丟失後,一般在這裡面進行重連 34 System.out.println("連接斷開……(可以做重連)"); 35 } 36 37 38 39 public void deliveryComplete(IMqttDeliveryToken token) { 40 System.out.println("deliveryComplete---------" + token.isComplete()); 41 } 42 43 44 public void messageArrived(String topic, MqttMessage message) throws Exception { 45 // subscribe後得到的消息會執行到這裡面 46 System.out.println("接收消息主題:" + topic + " 接收消息Qos:" + message.getQos() + "接收消息內容:" + new String(message.getPayload())); 47
//特別註意一下,messageArrived這個裡面不允許調用代碼和寫方法,如果調用就會斷開連接這個是個坑,然後用我這個try{}catch{}就沒有問題了,就不會再去掉線斷開連接
try { 48 JSONObject jsonObject = JSONObject.parseObject(message.toString()); 49 String equipment=jsonObject.getString("equipment"); 50 double temperature=Double.parseDouble(jsonObject.getString("temperature")); 51 double humidity=Double.parseDouble(jsonObject.getString("humidity")); 52 String datetime=jsonObject.getString("datetime"); 53 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh-mm-ss"); 54 System.out.println(equipment); 55 56 MySQLDemo mysqldemo=new MySQLDemo(); 57 mysqldemo.add(equipment, temperature, humidity, datetime); 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } 61 62 } 63 64 private void MqttPublisherFetch() { 65 // 66 // JSONObject jsonObject = JSONObject.parseObject(messtr); 67 // 68 // jsonObject.put("李鵬", "1111"); 69 // 70 // jsonObject.put("李12", "222"); 71 // 72 // System.out.println(jsonObject.toString()); 73 74 // System.out.println(equipment+"-------"+temperature+"---------------"+humidity+"-----------"+datetime+"-----------豬蹄---"); 75 } 76 77 78 } 79 80
特別註意一下,messageArrived這個裡面不允許調用代碼和寫方法,如果調用就會斷開連接這個是個坑,然後用我這個try{}catch{}就沒有問題了,就不會再去掉線斷開連接
簡單的一個小maven項目集成mqtt還有個jdbc簡單的也給大家粘上去
1 package com.jdbc; 2 3 import java.sql.*; 4 5 import com.alibaba.fastjson.JSONObject; 6 7 public class MySQLDemo { 8 9 // MySQL 8.0 以下版本 - JDBC 驅動名及資料庫 URL 10 static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; 11 static final String DB_URL = "jdbc:mysql://localhost:3306/zhny?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true"; 12 13 // MySQL 8.0 以上版本 - JDBC 驅動名及資料庫 URL 14 //static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; 15 //static final String DB_URL = "jdbc:mysql://localhost:3306/RUNOOB?useSSL=false&serverTimezone=UTC"; 16 17 18 // 資料庫的用戶名與密碼,需要根據自己的設置 19 static final String USER = "root"; 20 static final String PASS = "root"; 21 22 23 public void select() { 24 Connection conn = null; 25 Statement stmt = null; 26 try{ 27 // 註冊 JDBC 驅動 28 Class.forName(JDBC_DRIVER); 29 30 // 打開鏈接 31 System.out.println("連接資料庫..."); 32 conn = DriverManager.getConnection(DB_URL,USER,PASS); 33 34 35 // 執行查詢 36 System.out.println(" 實例化Statement對象..."); 37 stmt = conn.createStatement(); 38 String sql; 39 sql = "select * from crop"; 40 ResultSet rs = stmt.executeQuery(sql); 41 // 展開結果集資料庫 42 while(rs.next()){ 43 // 通過欄位檢索 44 int id = rs.getInt("id"); 45 String name = rs.getString("crop_name"); 46 47 // 輸出數據 48 System.out.print("ID: " + id); 49 System.out.print(", 站點名稱: " + name); 50 System.out.print("\n"); 51 } 52 // 完成後關閉 53 rs.close(); 54 stmt.close(); 55 conn.close(); 56 }catch(SQLException se){ 57 // 處理 JDBC 錯誤 58 se.printStackTrace(); 59 }catch(Exception e){ 60 // 處理 Class.forName 錯誤 61 e.printStackTrace(); 62 }finally{ 63 // 關閉資源 64 try{ 65 if(stmt!=null) stmt.close(); 66 }catch(SQLException se2){ 67 }// 什麼都不做 68 try{ 69 if(conn!=null) conn.close(); 70 }catch(SQLException se){ 71 se.printStackTrace(); 72 } 73 } 74 System.out.println("Goodbye!"); 75 } 76 77 public void add(String equipment,double temperature,double humidity,String datetime) { 78 79 80 Connection conn = null; 81 Statement stmt = null; 82 try{ 83 // 註冊 JDBC 驅動 84 Class.forName(JDBC_DRIVER); 85 86 // 打開鏈接 87 System.out.println("連接資料庫..."); 88 conn = DriverManager.getConnection(DB_URL,USER,PASS); 89 90 91 // 執行查詢 92 System.out.println(" 實例化Statement對象..."); 93 stmt = conn.createStatement(); 94 String sql = "INSERT INTO sensor (model,device_id,temperature,humidity,date_time,section_id)\r\n" + 95 "VALUES('"+equipment+"','"+temperature+"','"+temperature+"','"+humidity+"','"+datetime+"',12)";//感測器 96 int rs = stmt.executeUpdate(sql); 97 if (rs>0) { 98 System.out.println("true"+rs); 99 }else { 100 System.out.println("flase"+rs); 101 } 102 103 104 105 stmt.close(); 106 conn.close(); 107 }catch(SQLException se){ 108 // 處理 JDBC 錯誤 109 se.printStackTrace(); 110 }catch(Exception e){ 111 // 處理 Class.forName 錯誤 112 e.printStackTrace(); 113 }finally{ 114 // 關閉資源 115 try{ 116 if(stmt!=null) stmt.close(); 117 }catch(SQLException se2){ 118 }// 什麼都不做 119 try{ 120 if(conn!=null) conn.close(); 121 }catch(SQLException se){ 122 se.printStackTrace(); 123 } 124 } 125 System.out.println("Goodbye!"); 126 } 127 }
最後接收結果