生產者發送消息到隊列,然後隊列(rabbitmq)把消息發送給消費者(消費者向rabbitmq索取消息) ...
先上代碼
namespace RabbitMQDemo { public partial class HelloWorld : Form { string queueName1 = "hello_queue1";//消費者1 string queueName2 = "hello_queue2";//消費者2 Action<string> SetText; /// <summary> /// 單線程實例 /// </summary> private static readonly HelloWorld _helloWorld; static HelloWorld() { _helloWorld = new HelloWorld(); } /// <summary> /// 單例模式 /// </summary> public static HelloWorld SingleForm { get { return _helloWorld; } } private HelloWorld() { CheckForIllegalCrossThreadCalls = false; InitializeComponent(); ReseiveMsg(queueName1); ReseiveMsg(queueName2); SetText += OnSetText; } private void btnSendMsg_Click(object sender, EventArgs e) { SendMsg(); } /// <summary> /// 發送消息 /// </summary> private void SendMsg() { string message = txtPublisher.Text; if (message.Trim().Length <= 0) { MessageBox.Show("請輸入要發送的消息"); } string queueName = cbBoxQueues.SelectedValue.ToString(); var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body); } } /// <summary> /// 接收消息 /// </summary> private void ReseiveMsg(string queueName) { //string queueName = cbBoxQueues.SelectedText; try { var factory = new ConnectionFactory() { HostName = "localhost" }; //connection和channel不能使用using,否則會被dispose掉 var connection = factory.CreateConnection(); var channel = connection.CreateModel(); //聲明隊列 生產者和消費者都需要QueueDeclare channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); txtConsumer1.Invoke(SetText, message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); } catch (Exception ex) { MessageBox.Show(ex.ToString()); } } private void OnSetText(string txtContent) { string queueName = cbBoxQueues.SelectedValue.ToString(); if (queueName == queueName1) txtConsumer1.Text += string.Format("{0}\r\n", txtPublisher.Text); if (queueName == queueName2) txtConsumer2.Text += string.Format("{0}\r\n", txtPublisher.Text); } private void HelloWorld_Load(object sender, EventArgs e) { List<DataSource> lst = new List<DataSource>(); lst.Add(new DataSource("消費者1", "hello_queue1")); lst.Add(new DataSource("消費者2", "hello_queue2")); cbBoxQueues.DataSource = lst; cbBoxQueues.DisplayMember = "DisplayMember"; cbBoxQueues.ValueMember = "DisplayValue"; } private class DataSource { public DataSource(string displayMember,string displayValue) { DisplayMember = displayMember; DisplayValue = displayValue; } public string DisplayMember { get; set; } public string DisplayValue { get; set; } } } }View Code
界面如下:
大致流程是
生產者發送消息到隊列,然後隊列(rabbitmq)把消息發送給消費者(消費者向rabbitmq索取消息)
兩個消費者: