段時間在使用MQTTnet,都說這個東西比較好,可是翻了翻網上沒有例子給參考一下。 今天算是找到了,給高手的帖子做個宣傳吧. 原網址如下:https://blog.csdn.net/chenlu5201314/article/details/94740765 由於GitHub上介紹的東西比較少,以我 ...
段時間在使用MQTTnet,都說這個東西比較好,可是翻了翻網上沒有例子給參考一下。
今天算是找到了,給高手的帖子做個宣傳吧.
原網址如下:https://blog.csdn.net/chenlu5201314/article/details/94740765
由於GitHub上介紹的東西比較少,以我的水平真是不知道怎麼用,先照葫蘆畫瓢,再看看怎麼回事吧:
功能:
把訂閱與發佈做成一個類,還帶有自動重連的功能
using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; //客戶端需要用到 using MQTTnet.Client.Options; //具體連接時需要用到的屬性,ID的名稱,要連接Server的名稱,接入時用到的賬號和密碼,掉線時是否重新清除原有名稱,還有許多... using MQTTnet.Packets; //這個沒用上 using MQTTnet.Protocol; //這個也沒用上 using MQTTnet.Client.Receiving; //接收 using MQTTnet.Client.Disconnecting; //斷線 using MQTTnet.Client.Connecting; //連接
新建一個類:先寫一下變數和一些欄位
class HOSMQTT { private static MqttClient mqttClient = null; private static IMqttClientOptions options = null; private static bool runState = false; private static bool running = false; /// <summary> /// 伺服器IP /// </summary> private static string ServerUrl = "182.61.51.85"; /// <summary> /// 伺服器埠 /// </summary> private static int Port = 61613; /// <summary> /// 選項 - 開啟登錄 - 密碼 /// </summary> private static string Password = "ruichi8888"; /// <summary> /// 選項 - 開啟登錄 - 用戶名 /// </summary> private static string UserId = "admin"; /// <summary> /// 主題 /// <para>China/Hunan/Yiyang/Nanxian</para> /// <para>Hotel/Room01/Tv</para> /// <para>Hospital/Dept01/Room001/Bed001</para> /// <para>Hospital/#</para> /// </summary> private static string Topic = "China/Hunan/Yiyang/Nanxian"; /// <summary> /// 保留 /// </summary> private static bool Retained = false; /// <summary> /// 服務質量 /// <para>0 - 至多一次</para> /// <para>1 - 至少一次</para> /// <para>2 - 剛好一次</para> /// </summary> private static int QualityOfServiceLevel = 0; }
先看一下Start方法
public static void Start() { try { runState = true; Thread thread = new Thread(Work); //原帖中是這樣寫的 Thread thread = new Thread(new ThreadStart( Work)); thread.IsBackground = true; thread.Start(); } catch (Exception ex) { Console.WriteLine( "啟動客戶端出現問題:" + ex.ToString()); } }
沒進入正題之前,先普及一下基本知識
C#的ThreadStart 和 Thread 多線程,new Thread(t1);和new Thread(new ThreadStart(t1));沒有什麼區別.前者是.net的寫法,後者是C#的寫法
具體請看下麵的連接
https://www.cnblogs.com/rosesmall/p/8358348.html
進入整體,介紹連接方法 Work
private static void Work() { running = true; Console.WriteLine("Work >>Begin"); try { var factory = new MqttFactory(); //聲明一個MQTT客戶端的標準步驟 的第一步 mqttClient = factory.CreateMqttClient() as MqttClient; //factory.CreateMqttClient()實際是一個介面類型(IMqttClient),這裡是把他的類型變了一下 options = new MqttClientOptionsBuilder() //實例化一個MqttClientOptionsBulider
.WithTcpServer(ServerUrl, Port) .WithCredentials(UserId, Password) .WithClientId("XMan") .Build(); mqttClient.ConnectAsync(options); //連接伺服器
//下麵這些東西是什麼,為什麼要這麼寫,直到剛纔我還是不懂,不過在GitHub的網址我發現了出處. mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected)); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func<MqttClientDisconnectedEventArgs, Task>(Disconnected)); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(MqttApplicationMessageReceived)); while (runState) { Thread.Sleep(100); } } catch(Exception exp) { Console.WriteLine(exp); } Console.WriteLine("Work >>End"); running = false; runState = false; }
先來看看MqttClient 類裡面都有什麼東西
需要實現的介面,如何實現,說重點!
在GitHub上有個地方進去看看就知道了‘
這個頁面的最下方寫著如何實現 https://github.com/chkr1011/MQTTnet/wiki/Upgrading-guide
private void Something() { mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage); mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); } private async void OnAppMessage(MqttApplicationMessageReceivedEventArgs e) { } private async void OnConnected(MqttClientConnectedEventArgs e) { } private async void OnDisconnected(MqttClientDisconnectedEventArgs e) { }
在開始Connected方法之前有必要看一下關於同步和非同步的知識,
現學現賣簡單說一下:
Task就是非同步的調用,就在不影響主線程運行的另一個線程,但是他能像線程池一樣更高效的利用現有的空閑線程
async必須用來修飾Task ,void,或者Task<TResult>, await是等待非同步線程Task.Run()開始的後臺線程執行完畢。
記住要是Task 實現非同步功能,必須用 async 修飾,且async 與await成對出現。
詳見下麵大神寫的大作:https://www.cnblogs.com/doforfuture/p/6293926.html
下麵是什麼意思?
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected));
MqttClientConnectedHandlerDelegate 這個實例實現了mqttClient.ConnectedHandler介面
new Func<MqttClientConnectedEventArgs, Task>(Connected) ,
使用Func委托傳入MqttClientConnectedEventArgs類型的參數,返回的類型是Task,Task是一個類,這個類沒有返回值,如果有返回值就是Task<TResult>。
是委托就要帶一個方法取實現,這個方法就是Connected。
這句話的意思是,用MqttClientConnectedHandlerDelegate實現介面,同時使用委托取調用Connected的方法,並且給這個方法傳入一個MqttClientConnectedEventArgs參數,
這個委托的返回值是Task(就是不需要返回類型的非同步調用),這也就定義了Connected的類型必須是async Task。
好了來看下 Connected,這個函數什麼意思
就是與伺服器連接之後要乾什麼,訂閱一個Topic,或幾個Topic。連接之前已經連接了Connectasync(),如果斷線還會重連,後面會提到。
這個就連接之後需要做的事----訂閱!
private static async Task Connected(MqttClientConnectedEventArgs e) { try { List<TopicFilter> listTopic = new List<TopicFilter>(); if (listTopic.Count() <= 0) { var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build(); listTopic.Add(topicFilterBulder); Console.WriteLine("Connected >>Subscribe " + Topic); } await mqttClient.SubscribeAsync(listTopic.ToArray()); Console.WriteLine("Connected >>Subscribe Success"); } catch (Exception exp) { Console.WriteLine(exp.Message); } }
TopicFilter是一個Topic詳細信息的類
掉線的發生時會執行這個函數
private static async Task Disconnected(MqttClientDisconnectedEventArgs e) { try { Console.WriteLine("Disconnected >>Disconnected Server"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch (Exception exp) { Console.WriteLine("Disconnected >>Exception " + exp.Message); } } catch (Exception exp) { Console.WriteLine(exp.Message); } }
越寫問題越多,這個為什麼斷線的時候會執行這個方法,這不是事件,只是介面!
怎麼實現的?看了一下源碼,一時只看了大概,這些功能的綁定都是在ConnectAsync的時候就完成了!
下麵接收到消息的時候
/// <summary> /// 接收消息觸發事件 /// </summary> /// <param name="e"></param> private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { try { string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString(); string Retained = e.ApplicationMessage.Retain.ToString(); Console.WriteLine("MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained + ";"); Console.WriteLine("MessageReceived >>Msg: " + text); } catch (Exception exp) { Console.WriteLine(exp.Message); } }
最後就是發佈:一般會選擇0,如果選擇其他的情況在訂閱端不在的時候,伺服器可能會崩潰
/// <summary> /// /// 發佈 /// <paramref name="QoS"/> /// <para>0 - 最多一次</para> /// <para>1 - 至少一次</para> /// <para>2 - 僅一次</para> /// </summary> /// <param name="Topic">發佈主題</param> /// <param name="Message">發佈內容</param> /// <returns></returns> public static void Publish( string Topic,string Message) { try { if (mqttClient == null) return; if (mqttClient.IsConnected == false) mqttClient.ConnectAsync(options); if (mqttClient.IsConnected == false) { Console.WriteLine("Publish >>Connected Failed! "); return; } Console.WriteLine("Publish >>Topic: " + Topic + "; QoS: " + QualityOfServiceLevel + "; Retained: " + Retained + ";"); Console.WriteLine("Publish >>Message: " + Message); MqttApplicationMessageBuilder mamb = new MqttApplicationMessageBuilder() .WithTopic(Topic) .WithPayload(Message).WithRetainFlag(Retained); if (QualityOfServiceLevel == 0) { mamb = mamb.WithAtMostOnceQoS(); } else if (QualityOfServiceLevel == 1) { mamb = mamb.WithAtLeastOnceQoS(); } else if (QualityOfServiceLevel == 2) { mamb = mamb.WithExactlyOnceQoS(); } mqttClient.PublishAsync(mamb.Build()); } catch (Exception exp) { Console.WriteLine("Publish >>" + exp.Message); } }
紙上得來終覺淺,要改造成自己想要的些東西,還要花些功夫!不過這已經很好了!謝謝各位高手的貢獻