Redis 作為一個publish/subscribe server,起到了消息路由的功能。訂閱者可以通過subscribe和psubscribe命令向Redis server訂閱自己感興趣的消息類型,當發佈者通過publish命令向Redis server發送特定類型的消息時。訂閱該消息類型的全部 ...
Redis 作為一個publish/subscribe server,起到了消息路由的功能。訂閱者可以通過subscribe和psubscribe命令向Redis server訂閱自己感興趣的消息類型,當發佈者通過publish命令向Redis server發送特定類型的消息時。訂閱該消息類型的全部client都會收到此消息。這裡消息的傳遞是多對多的。一個client可以訂閱多個channel,也可以向多個channel發送消息。
下圖為大家展示了Redis消息機制的體系架構。
發佈者和訂閱者都是Redis客戶端,Channel則為Redis伺服器端,發佈者將消息發送到某個的頻道,訂閱了這個頻道的訂閱者就能接收到這條消息。Redis的這種發佈訂閱機制與基於主題的發佈訂閱類似,Channel相當於主題。
下麵列出來了Redis發佈消息、訂閱消息的相關命令。
publish:
發送消息:Redis採用PUBLISH命令發送消息,其返回值為接收到該消息的訂閱者的數量。
subscribe:
訂閱某個頻道:Redis採用SUBSCRIBE命令訂閱某個頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數量,以及接收到的消息,其中subscribe表示已經成功訂閱了某個頻道。
psubscribe:
模式匹配:模式匹配功能允許客戶端訂閱符合某個模式的頻道,Redis採用PSUBSCRIBE訂閱符合某個模式所有頻道,用“”表示模式,“”可以被任意值代替。
案例一:一個消息生產者,兩個消息消費者
案例二:兩個消息生產者,一個消息消費者
案例三:Redis消息機制的Java API
添加依賴:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency>
消息監聽器類:
import redis.clients.jedis.JedisPubSub; public class RedisMsgPubSubListener extends JedisPubSub { @Override public void unsubscribe() { super.unsubscribe(); } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); } @Override public void subscribe(String... channels) { super.subscribe(channels); } @Override public void psubscribe(String... patterns) { super.psubscribe(patterns); } @Override public void punsubscribe() { super.punsubscribe(); } @Override public void punsubscribe(String... patterns) { super.punsubscribe(patterns); } @Override public void onMessage(String channel, String message) { System.out.println("channel:" + channel + "receives message :" + message); this.unsubscribe(); } @Override public void onPMessage(String pattern, String channel, String message) { } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("channel:" + channel + "is been subscribed:" + subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { } @Override public void onPSubscribe(String pattern, int subscribedChannels) { } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("channel:" + channel + "is been unsubscribed:" + subscribedChannels); } }
測試程式:
import redis.clients.jedis.Jedis; public class TestMain { @Test public void testSubscribe() throws Exception{ Jedis jedis = new Jedis("localhost"); RedisMsgPubSubListener listener = new RedisMsgPubSubListener(); jedis.subscribe(listener, "redisChatTest"); //other code } @Test public void testPublish() throws Exception{ Jedis jedis = new Jedis("localhost"); jedis.publish("redisChatTest", "Hello World"); Thread.sleep(5000); jedis.publish("redisChatTest", "Hello Redis"); } }