面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教
掃描二維碼
隨時隨地手機看文章
簡介
Redis發(fā)布與發(fā)布功能(Pub/Sub
)是基于事件座位基本的通信機制,是目前應(yīng)用比較普遍的通信模型,它的目的主要是解除消息的發(fā)布者與訂閱者之間的耦合關(guān)系。
Redis作為消息發(fā)布和訂閱之間的服務(wù)器,起到橋梁的作用,在Redis里面有一個channel
的概念,也就是頻道,發(fā)布者通過指定發(fā)布到某個頻道,然后只要有訂閱者訂閱了該頻道,該消息就會發(fā)送給訂閱者,原理圖如下所示:
Redis同時也可以使用list類型實現(xiàn)消息隊列(消息隊列的實現(xiàn)以及應(yīng)用場景會在下一篇文章繼續(xù)講解)。
Redis的發(fā)布與訂閱的功能應(yīng)用還是比較廣泛的,它的應(yīng)用場景有很多。比如:最常見的就是實現(xiàn)實時聊天的功能,還是有就是博客的粉絲文章的推送,當(dāng)博主推送原創(chuàng)文章的時候,就會將文章實時推送給博主的粉絲。
簡介完Redis的發(fā)布于訂閱功能,下面就要來實操一下,包括linux命令的實操和java代碼的實現(xiàn)。
命令實操
這里就假設(shè)各位讀者都已經(jīng)安裝好自己的虛擬機環(huán)境和Redis了,若是沒有安裝好的,可以參考這一篇博文:https://www.cnblogs.com/ zuidongfeng/p/8032505.html
我這里是已經(jīng)安裝好了Redis了,直接啟動我們的Redis,我已經(jīng)設(shè)置好了開機啟動,上面的那篇博文有講解怎么設(shè)置開機啟動。
發(fā)布消息
Redis中發(fā)布消息的命令是publish,具體使用如下所示:
PUBLISH test "haha":test
表示頻道的名稱,haha
表示發(fā)布的內(nèi)容,這樣就完成了一個一個消息的發(fā)布,后面的返回(integer)0
表示0人訂閱。
訂閱頻道
于此同時再啟動一個窗口,這個窗口作為訂閱者,訂閱者的命令subscribe
,使用SUBSCRIBE test就表示訂閱了test這個頻道訂閱后返回的結(jié)果中由三條信息,第一個表示類型、第二個表示訂閱的頻道,第三個表示訂閱的數(shù)量。接著在第一個窗口進行發(fā)布消息:
可以看到發(fā)布者發(fā)布的消息,訂閱者都會實時的接收到,并發(fā)訂閱者收到的信息中也會出現(xiàn)三條信息,分別表示:返回值的類型、頻道名稱、消息內(nèi)容。
取消訂閱
若是想取消之前的訂閱可以使用unsubscribe命令,格式為:
unsubscribe??頻道名稱
//?取消之前訂閱的test頻道
unsubscribe??test
輸入命令后,返回以下結(jié)果:
[root@pinyoyougou-docker?src]#?./redis-cli?
127.0.0.1:6379>?UNSUBSCRIBE?test
1)?"unsubscribe"
2)?"test"
3)?(integer)?0
它分別表示:返回值的類型、頻道的名稱、該頻道訂閱的數(shù)量。
按模式訂閱
除了直接以特定的名城進行訂閱,還可以按照模式進行訂閱,模式的方式進行訂閱可以一次訂閱多個頻道,按照模式進行訂閱的命令為psubscribe
,具體格式如下:
psubscribe??模式
//?表示訂閱名稱以ldc開頭的頻道
psubscribe??ldc*
輸入上面的命令后,返回如下結(jié)果:
127.0.0.1:6379>?PSUBSCRIBE?ldc*
Reading?messages...?(press?Ctrl-C?to?quit)
1)?"psubscribe"
2)?"ldc*"
3)?(integer)?1
這個也是非常簡單,分別表示:返回的類型(表示按模式訂閱類型)、訂閱的模式、訂閱數(shù)。
取消按模式訂閱
假如你想取消之前的按模式訂閱,可以使用punsubscribe
來取消,具體格式:
punsubscribe?模式
//?取消頻道名稱按照ldc開頭的頻道
punsubscribe?ldc*
他的返回值,如下所示:
127.0.0.1:6379>?PUNSUBSCRIBE?ldc*
1)?"punsubscribe"
2)?"ldc*"
3)?(integer)?0
這個就不多說了,表示的意思和上面的一樣,可以看到上面的命令都是有規(guī)律的訂閱SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前綴UN,按模式訂閱也是。
查看訂閱消息
(1)你想查看某一個模式下訂閱數(shù)是大于零的頻道,可以使用如下格式的命令進行操作:
pubsub?channels?模式
//?查看頻道名稱以ldc模式開頭的訂閱數(shù)大于零的頻道
pubsub?channels?ldc*
(2)假如你想查看某一個頻道的訂閱數(shù),可以使用如下命令:
pubsub?numsub?頻道名稱
(3)查看按照模式的訂閱數(shù),可以使用如下命令進行操作:
pubsub?numpat
到這里以上的命令操作就基本結(jié)束了,下面就來代碼實戰(zhàn)。
代碼實練
(1)首先第一步想要操作Redis,再SpringBoot項目中引入jedis的依賴,畢竟jedis是官方推薦使用操作Redis的工具。
????redis.clients
????jedis
????2.9.0
(2)然后創(chuàng)建發(fā)布者Publisher
,用于消息的發(fā)布,具體代碼如下:
package?com.ldc.org.myproject.demo.redis;
import?java.io.BufferedReader;
import?java.io.IOException;
import?java.io.InputStreamReader;
import?redis.clients.jedis.Jedis;
import?redis.clients.jedis.JedisPool;
/**
?*?發(fā)布者
?*?@author?liduchang
?*
?*/
public?class?Publisher?extends?Thread{
?//?連接池?
?private?final?JedisPool?jedisPool;
?//?發(fā)布頻道名稱
?private?String?name;
?
?public?Publisher(JedisPool?jedisPool,?String?name)?{
??super();
??this.jedisPool?=?jedisPool;
??this.name?=?name;
?}
?
?@Override
?public?void?run()?{
??//?獲取要發(fā)布的消息
??BufferedReader?reader?=?new?BufferedReader(new?InputStreamReader(System.in));
??//?獲取連接
??Jedis?resource?=?jedisPool.getResource();
??while?(true)?{
???String?message?=?null;
???try?{
????message?=?reader.readLine();
????if?(!"exit".equals(message))?{
?????//?發(fā)布消息
?????resource.publish(name,?"發(fā)布者:"+Thread.currentThread().getName()+"發(fā)布消息:"+message);
????}?else?{
?????break;
????}
???}?catch?(IOException?e)?{
????e.printStackTrace();
???}
??}
?}
}
(3)接著創(chuàng)建訂閱類Subscriber
,并且繼承JedisPubSub
類,重寫onMessage、onSubscribe、onUnsubscribe
三個方法,這三個方法的調(diào)用時機在注釋上都有說明,具體的實現(xiàn)代碼如下:
package?com.ldc.org.myproject.demo.redis;
import?com.fasterxml.jackson.core.sym.Name;
import?redis.clients.jedis.JedisPubSub;
/**
?*?訂閱者
?*?@author?liduchang
?*/
public?class?Subscriber?extends?JedisPubSub?{
?//訂閱頻道名稱
?private?String?name;
?
?public?Subscriber(String?name)?{
??this.name?=?name;
?}
?/**
??*?訂閱者收到消息時會調(diào)用
??*/
?@Override
?public?void?onMessage(String?channel,?String?message)?{
??//?TODO?Auto-generated?method?stub
??super.onMessage(channel,?message);
??System.out.println("頻道:"+channel+"??接受的消息為:"+message);
?}
?/**
??*?訂閱了頻道會被調(diào)用
??*/
?@Override
?public?void?onSubscribe(String?channel,?int?subscribedChannels)?{
??System.out.println("訂閱了頻道:"+channel+"??訂閱數(shù)為:"+subscribedChannels);
?}
?/**
??*?取消訂閱頻道會被調(diào)用
??*/
?@Override
?public?void?onUnsubscribe(String?channel,?int?subscribedChannels)?{
??System.out.println("取消訂閱的頻道:"+channel+"??訂閱的頻道數(shù)量為:"+subscribedChannels);
?}
}
(4)這次創(chuàng)建的才是真正的訂閱者SubThread
,上面的Subscriber是指為了測試實訂閱的時候或者發(fā)布消息,能夠有信息輸出:
package?com.ldc.org.myproject.demo.redis;
import?redis.clients.jedis.Jedis;
import?redis.clients.jedis.JedisPool;
/**
?*?訂閱者線程
?*?@author?liduchang
?*
?*/
public?class?SubThread?extends?Thread?{
?
?private?final?JedisPool?jedisPool;
?
?private?final?Subscriber?subscriber;
?
?private?String?name;
?
?public?SubThread(JedisPool?jedisPool,Subscriber?subscriber,String?name)?{
??super();
??this.jedisPool?=?jedisPool;
??this.subscriber?=?subscriber;
??this.name?=?name;
?}
?
?@Override
?public?void?run()?{
??Jedis?jedis?=?null;
??try?{
???jedis?=?jedisPool.getResource();
???//?訂閱頻道為name
???jedis.subscribe(subscriber,?name);
??}?catch?(Exception?e)?{
???System.err.println("訂閱失敗");
??????e.printStackTrace();
??}?finally?{
???if?(jedis!=null)?{
?????//?jedis.close();
?????//歸還連接到redis池中
????jedisPool.returnResource(jedis);
???}
??}
?}
}
(5)后面就是測試了,分別測試發(fā)布與訂閱的測試,發(fā)布者為TestPublisher
,訂閱者為TestSubscriber
:
package?com.ldc.org.myproject.demo.redis;
import?java.util.concurrent.ExecutorService;
import?java.util.concurrent.Executors;
import?java.util.concurrent.TimeUnit;
import?redis.clients.jedis.JedisPool;
public?class?TestPublisher?{
?
?public?static?void?main(String[]?args)?throws?InterruptedException?{
??JedisPool?jedisPool?=?new?JedisPool("192.168.163.155");
??//?向ldc頻道發(fā)布消息
??Publisher?publisher?=?new?Publisher(jedisPool,?"ldc");
??publisher.start();
?}
}
訂閱者
package?com.ldc.org.myproject.demo.redis;
import?java.util.concurrent.ExecutorService;
import?java.util.concurrent.Executors;
import?java.util.concurrent.TimeUnit;
import?redis.clients.jedis.JedisPool;
public?class?TestSubscriber1?{
?
?public?static?void?main(String[]?args)?throws?InterruptedException?{
??JedisPool?jedisPool?=?new?JedisPool("192.168.163.155",6379);
??Subscriber?subscriber?=?new?Subscriber("黎杜");
??//?訂閱ldc頻道
??SubThread?thread=?new?SubThread(jedisPool,?subscriber,?"ldc");
??thread.start();
??Thread.sleep(600000);
??//?取消訂閱
??subscriber.unsubscribe("ldc");
?}
}
這里為了測試方便就直接創(chuàng)建線程的方式,更好的話可以使用線程池的方式通過線程池的submit
方法來執(zhí)行線程,若是不用了可以使用shutdown
方式關(guān)閉。
好了這一期的Redis的實現(xiàn)訂閱與發(fā)布的講解就說完了,我們下一期在講解Redis的集群的知識,下期再見。
特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!