十年河東,十年河西,莫欺少年窮 學無止境,精益求精 netcore3.1控制台應用程式,引入MQTTnet 2.8版本 訂閱端: using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; ...
十年河東,十年河西,莫欺少年窮
學無止境,精益求精
netcore3.1控制台應用程式,引入MQTTnet 2.8版本
訂閱端:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using MQTTnet; using MQTTnet.Server; using MQTTnet.Client; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using MQTTnet.Protocol; namespace swapConsole { class Program { private static MqttClient mqttClient = null; private static string topic = "test123ABC"; private static IMqttClientOptions Options { get { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); builder.WithCleanSession(false); //用戶名 密碼 builder.WithCredentials("", ""); var id = Guid.NewGuid().ToString(); builder.WithClientId(id); builder.WithTcpServer("1270.0.0.0", 1883); return builder.Build(); } } static async Task Main(string[] args) { MqttFactory factory = new MqttFactory(); if (mqttClient == null) { mqttClient = (MqttClient)factory.CreateMqttClient(); mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += async (s, e) => { Console.WriteLine("嘗試重連!" + Environment.NewLine); await ConnectToServer(); }; } await ConnectToServer(); Console.ReadLine(); } /// <summary> /// 連接MQTT伺服器 /// </summary> private static async Task ConnectToServer() { try { var res =await mqttClient.ConnectAsync(Options); } catch (Exception ex) { Console.WriteLine($"連接到MQTT伺服器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine); } } /// <summary> /// 連接MQTT伺服器觸發 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已連接到MQTT伺服器!" + Environment.NewLine); SubscribeInfo(); } /// <summary> /// 接收消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); } /// <summary> /// 訂閱消息 /// </summary> public static void SubscribeInfo() { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("訂閱主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.WriteLine("MQTT客戶端尚未連接!"); return; } mqttClient.SubscribeAsync(new List<TopicFilter> { new TopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce) }); Console.WriteLine($"已訂閱[{topic}]主題" + Environment.NewLine); } /// <summary> /// 退訂消息 /// </summary> public static void UnSubscribeInfo() { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("退訂主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.WriteLine("MQTT客戶端尚未連接!"); return; } mqttClient.UnsubscribeAsync(topic); Console.WriteLine($"已退訂[{topic}]主題" + Environment.NewLine); } } }View Code
發佈端:
using MQTTnet; using MQTTnet.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace swapPublish { class Program { private static MqttClient mqttClient = null; private static string topic = "test123ABC"; private static IMqttClientOptions Options { get { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); builder.WithCleanSession(false); //用戶名 密碼 builder.WithCredentials("", ""); var id = Guid.NewGuid().ToString(); builder.WithClientId(id); builder.WithTcpServer("127.0.0.1", 1883); return builder.Build(); } } static async Task Main(string[] args) { MqttFactory factory = new MqttFactory(); if (mqttClient == null) { mqttClient = (MqttClient)factory.CreateMqttClient(); mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += async(s, e) => { Console.WriteLine("嘗試重連!" + Environment.NewLine); await ConnectToServer(); }; } await ConnectToServer(); Console.WriteLine("已斷開MQTT連接!" + Environment.NewLine); Console.ReadLine(); } /// <summary> /// 連接MQTT伺服器 /// </summary> private static async Task ConnectToServer() { try { var res = await mqttClient.ConnectAsync(Options); } catch (Exception ex) { Console.WriteLine($"連接到MQTT伺服器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine); } } /// <summary> /// 連接MQTT伺服器觸發 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已連接到MQTT伺服器!" + Environment.NewLine); for(int i = 0; i < 10; i++) { var tak = PublishInfo(); Thread.Sleep(2000); } } private static async Task PublishInfo( ) { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("發佈主題不能為空!"); return; } string inputString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); MqttApplicationMessageBuilder builder = new MqttApplicationMessageBuilder(); builder.WithPayload(Encoding.UTF8.GetBytes(inputString)); builder.WithTopic(topic); builder.WithRetainFlag(false); builder.WithExactlyOnceQoS(); await mqttClient.PublishAsync(builder.Build()); } } }View Code
如何只允許一個客戶端消費同一個消息,暫時未解決!
大家有解決方法,請貼出評論。謝謝
MQTTnet 3.0.16 版本的使用
客戶端:
using MQTTnet; using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace mqttsub { class Program { static async Task Main(string[] args) { MqttClient mqtt = new MqttClient(); await mqtt.StartAsync(); Console.ReadKey(); } } public class MqttClient { private IMqttClient client; private IMqttClientOptions options; MqttClientDto model =null; public MqttClient() { model = new MqttClientDto { Account = "", PassWord = "", ClientId = Guid.NewGuid().ToString(), IP = "", Port = 1883, Topic="test/+/ABC" //通配符模式 該模式匹配 test/123/ABC testABC test/DDDDD/ABC 等 }; } public async Task StartAsync() { try { client = new MqttFactory().CreateMqttClient(); var build = new MqttClientOptionsBuilder() //配置客戶端Id .WithClientId(Guid.NewGuid().ToString()) //配置登錄賬號 .WithCredentials(model.Account,model.PassWord) //配置伺服器IP埠 這裡得埠號是可空的 .WithTcpServer(model.IP, 1883) .WithCleanSession(); options = build.Build(); //收到伺服器發來消息 client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler); //client.UseApplicationMessageReceivedHandler(args=> { // Console.WriteLine("==================================================="); // Console.WriteLine("收到消息:"); // Console.WriteLine($"主題:{args.ApplicationMessage.Topic}"); // Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); // Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); // Console.WriteLine(); //}); //連接成功 client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(ConnectedHandler); //client.UseConnectedHandler(args=> { // Console.WriteLine("本客戶端已連接成功"); // Console.WriteLine($"地址:{model.IP}"); // Console.WriteLine($"埠:{model.Port}"); // Console.WriteLine($"客戶端:{model.ClientId}"); // Console.WriteLine($"賬號:{model.Account}"); // Console.WriteLine(); // //第1種訂閱方式 // client.SubscribeAsync("主題名稱").GetAwaiter().GetResult(); // //第2種訂閱方式 // List<MqttTopicFilter> Topics = new List<MqttTopicFilter>(); // Topics.Add(new MqttTopicFilter() { Topic = "主題名稱A", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }); // Topics.Add(new MqttTopicFilter() { Topic = "主題名稱B" }); // Topics.Add(new MqttTopicFilter() { Topic = "主題名稱C" }); // client.SubscribeAsync(Topics.ToArray()).GetAwaiter().GetResult(); // //第3種訂閱方式 // MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder(); // builder.WithTopicFilter("AAA"); // client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult(); //}); //斷開連接 重連就寫在此處 client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(DisconnectedHandler); //client.UseDisconnectedHandler(args => //{ // Console.WriteLine("本客戶端已經斷開連接"); // Console.WriteLine(); // try // { // client.ConnectAsync(options).GetAwaiter().GetResult(); // } // catch (Exception ex) // { // Console.WriteLine("重連失敗"); // } //}); //客戶端發送消息 //await client.PublishAsync("你想要的主題", "你需要發送的東西"); //await client.PublishAsync("你想要的主題", Encoding.UTF8.GetBytes("你需要發送的東西").ToList()); //連接 await client.ConnectAsync(options); } catch (MqttConnectingFailedException) { Console.WriteLine("身份校驗失敗"); } catch (Exception ex) { Console.WriteLine("出現異常"); Console.WriteLine(ex.Message); } } /// <summary> /// 客戶端斷開連接後,如果需要重連在此處實現 /// </summary> /// <param name="obj"></param> private async void DisconnectedHandler(MqttClientDisconnectedEventArgs obj) { Console.WriteLine("本客戶端已經斷開連接"); Console.WriteLine(); try { await client.ConnectAsync(options); } catch (Exception) { Console.WriteLine("重連失敗"); } } /// <summary> /// 連接成功 在此處做訂閱主題(Topic)操作 /// </summary> /// <param name="obj"></param> private async void ConnectedHandler(MqttClientConnectedEventArgs obj) { Console.WriteLine("本客戶端已連接成功"); Console.WriteLine($"地址:{model.IP}"); Console.WriteLine($"埠:{model.Port}"); Console.WriteLine($"客戶端:{model.ClientId}"); Console.WriteLine($"賬號:{model.Account}"); Console.WriteLine(); //第1種訂閱方式 // client.SubscribeAsync("主題名稱").GetAwaiter().GetResult(); //第2種訂閱方式 List<MqttTopicFilter> Topics = new List<MqttTopicFilter>(); Topics.Add(new MqttTopicFilter() { Topic = model.Topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce}); //Topics.Add(new MqttTopicFilter() { Topic = "主題名稱B" }); //Topics.Add(new MqttTopicFilter() { Topic = "主題名稱C" }); await client.SubscribeAsync(Topics.ToArray()); //第3種訂閱方式 //MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder(); //builder.WithTopicFilter("AAA"); //client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult(); } /// <summary> /// 收到消息 /// </summary> /// <param name="obj"></param> private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj) { Console.WriteLine("==================================================="); Console.WriteLine("收到消息:"); Console.WriteLine($"主題:{obj.ApplicationMessage.Topic}"); Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}"); Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); Console.WriteLine(); } } public class MqttClientDto { /// <summary> /// 連接地址 /// </summary> public string IP { get; set; } /// <summary> /// 賬號 /// </summary> public string Account { get; set; } /// <summary> /// 密碼 /// </summary> public string PassWord { get; set; } /// <summary> /// 客戶端Id /// </summary> public string ClientId { get; set; } public int Port { get; set; } public string Topic { get; set; } } }View Code
服務端:
using MQTTnet; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Net; using System.Text; using System.Threading.Tasks; namespace MqttPub { class Program { static async Task Main(string[] args) { await new ServerDome(). StartAsync(); Console.Read(); } } public class ServerDome { private IMqttServer server; MqttClientDto model = null; public ServerDome() { model = new MqttClientDto { Account = "", PassWord = "", ClientId = Guid.NewGuid().ToString(), IP = "", Port = 1883, Topic = "test" }; } public async Task StartAsync() { if (server == null || !server.IsStarted) { server = new MqttFactory().CreateMqttServer(); MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder(); //、預設監聽埠 serverOptions.WithDefaultEndpointPort(model.Port); //校驗客戶端信息 serverOptions.WithConnectionValidator(client => { string Account = client.Username; string PassWord = client.Password; string clientid = client.ClientId; if (Account == "" && PassWord == "") { client.ReasonCode = MqttConnectReasonCode.Success; Console.WriteLine("校驗成功"); } else { client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; Console.WriteLine("校驗失敗"); } }); //客戶端發送消息監聽 server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler); //server.UseApplicationMessageReceivedHandler(args=>{ // Console.WriteLine("==================================================="); // Console.WriteLine("收到消息:"); // Console.WriteLine($"客戶端:{args.ClientId}"); // Console.WriteLine($"主題:{args.ApplicationMessage.Topic}"); // Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); // Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); // Console.WriteLine(); //}); //客戶端連接事件 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(ClientConnectedHandler); //server.UseClientConnectedHandler(args => //{ // Console.WriteLine($"{args.ClientId}此客戶端已經連接到伺服器"); //}); //客戶端斷開連接事件 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(ClientDisconnectedHandler); //server.UseClientDisconnectedHandler(args => { // Console.WriteLine($"斷開連接的客戶端:{args.ClientId}"); // Console.WriteLine($"斷開連接類型:{args.DisconnectType.ToString()}"); //}); //客戶端訂閱主題事件 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribedTopicHandler); //客戶端取消訂閱主題事件 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribedTopicHandler); //伺服器啟動事件 server.StartedHandler = new MqttServerStartedHandlerDelegate(StartedHandler); //伺服器停止事件 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(StoppedHandler); //服務端發送數據 //await server.PublishAsync("你想要的主題","你需要發送的東西"); //var mqttApplicationMessage = new MqttApplicationMessage(); //mqttApplicationMessage.Topic = "你想要的主題"; //mqttApplicationMessage.Payload = Encoding.ASCII.GetBytes("你需要發送的東西"); //await server.PublishAsync(mqttApplicationMessage); //啟動伺服器 await server.StartAsync(serverOptions.Build()); } } public async Task StopAsync() { if (server != null) { if (server.IsStarted) {