1.Linux上安裝Docken 伺服器系統版本以及內核版本:cat /etc/redhat-release 查看伺服器內核版本:uname -r 安裝依賴包:yum install -y yum-utils device-mapper-persistent-data lvm2 設置阿裡雲鏡像源:y ...
1.Linux上安裝Docken
伺服器系統版本以及內核版本:cat /etc/redhat-release 查看伺服器內核版本:uname -r 安裝依賴包:yum install -y yum-utils device-mapper-persistent-data lvm2 設置阿裡雲鏡像源:yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo 安裝Docker:yum install -y docker-ce 社區版(Community Edition,縮寫為 CE) 企業版(Enterprise Edition,縮寫為 EE) 啟動docker並設置開機自啟: 啟動docker命令:systemctl start docker 設置開機自啟命令:systemctl enable docker 查看docker版本命令:docker version
刪除docker-ce命令:yum remove docker-ce
刪除鏡像、容器、配置文件等內容
rm -rf /var/lib/containerd
rm -rf /var/lib/docker
----------------------------------------通過docker help命令來查看更多的命令--------------------------------
docker search --鏡像名 搜索倉庫鏡像
docker pull --鏡像名 拉取鏡像
docker ps 查看目前正在運行的所有容器 (-a 顯示包括已經停止的容器)
docker rmi image_id/image_name 刪除鏡像
docker build 使用Dockerfile創建鏡像
docker run 運行容器
docker exec 進入容器中執行命令 (例如:docker exec -it container_id/container_name /bin/bash)
docker logs container_id/container_name 查看容器日誌(例如:docker logs -f -t --tail 10 container_id )
docker start container_id/container_name 啟動容器
docker restart container_id/container_name 重啟容器
docker stop container_id/container_name 停止容器
docker rm container_id/container_name 刪除容器(只能刪除已停止的容器)
2基於Docken安裝RabbitMq
docker啟動:systemctl start docker
docker重啟:ystemctl restart docker
docker關閉:systemctl stop docker
查看正在運行容器:docker ps
查詢Rabbitmq鏡像: docker search rabbitmq
安裝Rabbitmq鏡像:
指定版本:docker pull rabbitmq:3.7.7-management
最新版本:docker pull rabbitmq
創建和啟動容器:docker run -d --hostname myrabbitmq --name rabbitmq -p 5672:5672 -p 15673:15672 rabbitmq
-d 後臺運行容器;
--hostname 主機名;
--name 指定容器名;
-p 指定服務運行的埠
5672 控制台Web埠號(服務端)
15672 應用訪問埠(客戶端)
-v 映射目錄或文件
-e 指定環境變數(RABBITMQ_DEFAULT_VHOST:預設虛擬機名;RABBITMQ_DEFAULT_USER:預設的用戶名;RABBITMQ_DEFAULT_PASS:預設用戶名的密碼)
進入容器內部:docker exec -it 容器id /bin/bash
運行:rabbitmq-plugins enable rabbitmq_management
重啟rabbitmq:docker start rabbitmq
重啟容器:docker restart rabbitmq
停止容器:docker stop rabbitmq
訪問:http://ip:15672/
賬號密碼:guest/guest
其它命令:
列出所有用戶:rabbitmqctl list_users
添加用戶:rabbitmqctl add_user username password 如:新增一個用戶:rabbitmqctl add_user 名稱 密碼
刪除用戶:rabbitmqctl delete_user username
修改密碼:rabbitmqctl change_password username newpassword
列出用戶許可權:rabbitmqctl list_user_permissions username
列出虛擬主機上的所有許可權:rabbitmqctl list_permissions -p vhostpath
設置用戶許可權:rabbitmqctl set_permissions -p vhostpath username “.” “.” “.*” 如:設置用戶許可權:rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
3.添加用戶和設置許可權
4.NET中使用RabbitMQ
RabbitMq有7種模式:RabbitMQ Tutorials | RabbitMQ
安裝包:RabbitMQ.Client
定義隊列和交換機名稱
/// <summary> /// 定義隊列和交換機名稱 /// </summary> public class RabbitConstant { public const string QUEUE_HELLO_WORLD = "helloworld.queue"; public const string QUEUE_SMS = "sms.queue"; public const string EXCHANGE_WEATHER = "weather.exchange"; public const string QUEUE_BAIDU = "baidu.queue"; public const string QUEUE_SINA = "sina.queue"; public const string EXCHANGE_WEATHER_ROUTING = "weather.routing.exchange"; public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange"; }
第一種模式:Hello World
消費者:
using RabbitMQ.Client; using RabbitMQ.Client.Events; public class HelloConsumer { public static void HelloWorldShow() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.Port = 5672;//5672是RabbitMQ預設的埠號 factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "my_vhost"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列 * 第一個參數:隊列名稱ID * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失 * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用 * 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列 * 其他額外參數為null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Cyan; //事件消費者類 EventingBasicConsumer consumers = new EventingBasicConsumer(channel); // 觸發事件 consumers.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); // false只是確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Consumer01接收消息:{message}"); }; /* * 從MQ伺服器中獲取數據 * 創建一個消息消費者 * 第一個參數:隊列名 * 第二個參數:是否自動確認收到消息,false代表手動確認消息,這是MQ推薦的做法 * 第三個參數:要傳入的IBasicConsumer介面 */ channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生產者:
using RabbitMQ.Client; public class HelloProducer { public static void HelloWorldShow() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1";//IP factory.Port = 5672;//埠 factory.UserName = "admin";//用戶名 factory.Password = "admin";//密碼 factory.VirtualHost = "my_vhost";//虛擬主機 // 獲取TCP 長連接 using (var connection = factory.CreateConnection()) { // 創建通信“通道”,相當於TCP中的虛擬連接 using (var channel = connection.CreateModel()) { /* * 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列 * 第一個參數:隊列名稱ID * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失 * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用 * 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列 * 其他額外參數為null */ channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null); Console.ForegroundColor = ConsoleColor.Red; string message = "hello CodeMan 666";//要發送的數據 var body = Encoding.UTF8.GetBytes(message); /* * 第一個參數:exchange:交換機,暫時用不到,在進行發佈訂閱時才會用到 * 第二個參數:路由key * 第三個參數:額外的設置屬性 * 第四個參數:最後一個參數是要傳遞的消息位元組數組 */ channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body); Console.WriteLine($"producer消息:{message}已發送"); } } } }
-------------------------------------------------------------------------------漂亮的分割線--------------------------------------------------------------------------------------------------
獲取ConnectionFactory 對象
/// <summary> /// RabbitMQ連接類 /// </summary> public class RabbitUtils { /// <summary> /// 獲取ConnectionFactory對象 /// </summary> /// <returns></returns> public static ConnectionFactory GetConnection() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1";//IP地址 factory.Port = 5672;//5672是RabbitMQ預設的埠號 factory.UserName = "admin";//用戶名 factory.Password = "admin";//密碼 factory.VirtualHost = "my_vhost";//虛擬主機 return factory; } }
/// <summary> /// 發送消息內容類 /// </summary> public class Sms { public string Name { get; set; } public string Mobile { get; set; } public string Content { get; set; } public Sms() { } public Sms(string name, string mobile, string content) { Name = name; Mobile = mobile; Content = content; } }
第二種模式:Work Queues
消費者1
public class SmsReceive { public static void Sender() { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); /* * 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列 * 第一個參數:隊列名稱ID * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失 * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用 * 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列 * 其他額外參數為null */ channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者 // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息後(確認後),在從隊列中獲取一個新的 channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(30); Console.WriteLine($"SmsSender-發送簡訊成功:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } }
消費者2
public class SmsReceive { public static void Sender() { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者 // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息後(確認後),在從隊列中獲取一個新的 channel.BasicQos(0, 1, false);//處理完一個取一個 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(60); Console.WriteLine($"SmsSender-發送簡訊成功:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } }
生產者
public class SmsSender { public static void Sender() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { /* * 創建隊列,聲明並創建一個隊列,如果隊列已存在,則使用這個隊列 * 第一個參數:隊列名稱ID * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失 * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用 * 第四個:是否自動刪除,false代表連接停掉後不自動刪除這個隊列 * 其他額外參數為null */ channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); for (int i = 0; i < 100; i++) { Sms sms = new Sms("乘客" + i, "139000000" + i, "您的車票已預定成功"); string jsonSms = JsonConvert.SerializeObject(sms); var body = Encoding.UTF8.GetBytes(jsonSms); /* * 第一個參數:exchange:交換機,暫時用不到,在進行發佈訂閱時才會用到 * 第二個參數:路由key * 第三個參數:額外的設置屬性 * 第四個參數:最後一個參數是要傳遞的消息位元組數組 */ channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body); Console.WriteLine($"正在發送內容:{jsonSms}"); } Console.WriteLine("發送數據成功"); } } } }
第三種模式:Publish/Subscribe
消費者1
public class WeatherFanout { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交換機 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); // 聲明隊列信息 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用於將隊列與交換機綁定 * 參數1:隊列名 * 參數2:交換機名 * 參數3:路由Key(暫時用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的氣象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
消費者2
public class WeatherFanout { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); // 聲明隊列信息 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用於將隊列與交換機綁定 * 參數1:隊列名 * 參數2:交換機名 * 參數3:路由Key(暫時用不到) */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, ""); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的氣象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生產者
public class WeatherFanout { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { string message = "20度"; var body = Encoding.UTF8.GetBytes(message); /* * 第一個參數:exchange:交換機,暫時用不到,在進行發佈訂閱時才會用到 * 第二個參數:路由key * 第三個參數:額外的設置屬性 * 第四個參數:最後一個參數是要傳遞的消息位元組數組 */ channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body); Console.WriteLine("天氣信息發送成功!"); } } } }
第三種模式:Routing
消費者1
public class WeatherDirect { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交換機 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); //隊列 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用於將隊列與交換機綁定 * 參數1:隊列名 * 參數2:交換機名 * 參數3:路由Key(暫時用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525"); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的氣象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
消費者2
public class WeatherDirect { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交換機 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); // 聲明隊列信息 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null); /* * queueBind 用於將隊列與交換機綁定 * 參數1:隊列名 * 參數2:交換機名 * 參數3:路由Key */ channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525"); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525"); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525"); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的氣象信息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer); Console.WriteLine("Press [Enter] to exit"); Console.Read(); } } } }
生產者
public class WeatherDirect { public static void Weather() { Dictionary<string, string> area = new Dictionary<string, string>(); area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據"); area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據"); area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據"); area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據"); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { /* * 第一個參數:exchange:交換機,暫時用不到,在進行發佈訂閱時才會用到 * 第二個參數:路由key * 第三個參數:額外的設置屬性 * 第四個參數:最後一個參數是要傳遞的消息位元組數組 */ channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key, null, Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("氣象信息發送成功!"); } } } }
第五章模式:Topics
消費者1
public class WeatherTopic { public static void Weather() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { //交換機 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); // 聲明隊列信息 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null); /* * queueBind 用於將隊列與交換機綁定 * 參數1:隊列名 * 參數2:交換機名 * 參數3:路由Key(暫時用不到) */ channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");//有關china的所有信息 channel.BasicQos(0, 1, false