當(dāng)前位置:首頁 > 公眾號精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]Redis發(fā)布與發(fā)布功能是基于事件座位基本的通信機制,是目前應(yīng)用比較普遍的通信模型,它的目的主要是解除消息的發(fā)布者與訂閱者之間的耦合關(guān)系。

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

簡介

Redis發(fā)布與發(fā)布功能(Pub/Sub)是基于事件座位基本的通信機制,是目前應(yīng)用比較普遍的通信模型,它的目的主要是解除消息的發(fā)布者與訂閱者之間的耦合關(guān)系

Redis作為消息發(fā)布和訂閱之間的服務(wù)器,起到橋梁的作用,在Redis里面有一個channel的概念,也就是頻道,發(fā)布者通過指定發(fā)布到某個頻道,然后只要有訂閱者訂閱了該頻道,該消息就會發(fā)送給訂閱者,原理圖如下所示:


面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

Redis同時也可以使用list類型實現(xiàn)消息隊列(消息隊列的實現(xiàn)以及應(yīng)用場景會在下一篇文章繼續(xù)講解)。

Redis的發(fā)布與訂閱的功能應(yīng)用還是比較廣泛的,它的應(yīng)用場景有很多。比如:最常見的就是實現(xiàn)實時聊天的功能,還是有就是博客的粉絲文章的推送,當(dāng)博主推送原創(chuàng)文章的時候,就會將文章實時推送給博主的粉絲。

簡介完Redis的發(fā)布于訂閱功能,下面就要來實操一下,包括linux命令的實操和java代碼的實現(xiàn)。

命令實操

這里就假設(shè)各位讀者都已經(jīng)安裝好自己的虛擬機環(huán)境和Redis了,若是沒有安裝好的,可以參考這一篇博文:https://www.cnblogs.com/ zuidongfeng/p/8032505.html

我這里是已經(jīng)安裝好了Redis了,直接啟動我們的Redis,我已經(jīng)設(shè)置好了開機啟動,上面的那篇博文有講解怎么設(shè)置開機啟動。

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

發(fā)布消息

Redis中發(fā)布消息的命令是publish,具體使用如下所示:

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

PUBLISH test "haha":test表示頻道的名稱,haha表示發(fā)布的內(nèi)容,這樣就完成了一個一個消息的發(fā)布,后面的返回(integer)0表示0人訂閱。

訂閱頻道

于此同時再啟動一個窗口,這個窗口作為訂閱者,訂閱者的命令subscribe,使用SUBSCRIBE test就表示訂閱了test這個頻道面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教訂閱后返回的結(jié)果中由三條信息,第一個表示類型、第二個表示訂閱的頻道,第三個表示訂閱的數(shù)量。接著在第一個窗口進行發(fā)布消息:

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

可以看到發(fā)布者發(fā)布的消息,訂閱者都會實時的接收到,并發(fā)訂閱者收到的信息中也會出現(xiàn)三條信息,分別表示:返回值的類型、頻道名稱、消息內(nèi)容。

取消訂閱

若是想取消之前的訂閱可以使用unsubscribe命令,格式為:

unsubscribe??頻道名稱
//?取消之前訂閱的test頻道
unsubscribe??test

輸入命令后,返回以下結(jié)果:

[root@pinyoyougou-docker?src]#?./redis-cli?
127.0.0.1:6379>?UNSUBSCRIBE?test
1)?"unsubscribe"
2)?"test"
3)?(integer)?0

它分別表示:返回值的類型、頻道的名稱、該頻道訂閱的數(shù)量。

按模式訂閱

除了直接以特定的名城進行訂閱,還可以按照模式進行訂閱,模式的方式進行訂閱可以一次訂閱多個頻道,按照模式進行訂閱的命令為psubscribe,具體格式如下:

psubscribe??模式
//?表示訂閱名稱以ldc開頭的頻道
psubscribe??ldc*

輸入上面的命令后,返回如下結(jié)果:

127.0.0.1:6379>?PSUBSCRIBE?ldc*
Reading?messages...?(press?Ctrl-C?to?quit)
1)?"psubscribe"
2)?"ldc*"
3)?(integer)?1

這個也是非常簡單,分別表示:返回的類型(表示按模式訂閱類型)、訂閱的模式、訂閱數(shù)。

取消按模式訂閱

假如你想取消之前的按模式訂閱,可以使用punsubscribe來取消,具體格式:

punsubscribe?模式
//?取消頻道名稱按照ldc開頭的頻道
punsubscribe?ldc*

他的返回值,如下所示:

127.0.0.1:6379>?PUNSUBSCRIBE?ldc*
1)?"punsubscribe"
2)?"ldc*"
3)?(integer)?0

這個就不多說了,表示的意思和上面的一樣,可以看到上面的命令都是有規(guī)律的訂閱SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前綴UN,按模式訂閱也是。

查看訂閱消息

(1)你想查看某一個模式下訂閱數(shù)是大于零的頻道,可以使用如下格式的命令進行操作:

pubsub?channels?模式
//?查看頻道名稱以ldc模式開頭的訂閱數(shù)大于零的頻道
pubsub?channels?ldc*

(2)假如你想查看某一個頻道的訂閱數(shù),可以使用如下命令:

pubsub?numsub?頻道名稱

(3)查看按照模式的訂閱數(shù),可以使用如下命令進行操作:

pubsub?numpat

到這里以上的命令操作就基本結(jié)束了,下面就來代碼實戰(zhàn)。

代碼實練

(1)首先第一步想要操作Redis,再SpringBoot項目中引入jedis的依賴,畢竟jedis是官方推薦使用操作Redis的工具。


????redis.clients
????jedis
????2.9.0

(2)然后創(chuàng)建發(fā)布者Publisher,用于消息的發(fā)布,具體代碼如下:

package?com.ldc.org.myproject.demo.redis;

import?java.io.BufferedReader;
import?java.io.IOException;
import?java.io.InputStreamReader;
import?redis.clients.jedis.Jedis;
import?redis.clients.jedis.JedisPool;

/**
?*?發(fā)布者
?*?@author?liduchang
?*
?*/
public?class?Publisher?extends?Thread{
?//?連接池?
?private?final?JedisPool?jedisPool;
?//?發(fā)布頻道名稱
?private?String?name;
?
?public?Publisher(JedisPool?jedisPool,?String?name)?{
??super();
??this.jedisPool?=?jedisPool;
??this.name?=?name;
?}
?
?@Override
?public?void?run()?{
??//?獲取要發(fā)布的消息
??BufferedReader?reader?=?new?BufferedReader(new?InputStreamReader(System.in));
??//?獲取連接
??Jedis?resource?=?jedisPool.getResource();
??while?(true)?{
???String?message?=?null;
???try?{
????message?=?reader.readLine();
????if?(!"exit".equals(message))?{
?????//?發(fā)布消息
?????resource.publish(name,?"發(fā)布者:"+Thread.currentThread().getName()+"發(fā)布消息:"+message);
????}?else?{
?????break;
????}
???}?catch?(IOException?e)?{
????e.printStackTrace();
???}
??}
?}
}

(3)接著創(chuàng)建訂閱類Subscriber,并且繼承JedisPubSub 類,重寫onMessage、onSubscribe、onUnsubscribe三個方法,這三個方法的調(diào)用時機在注釋上都有說明,具體的實現(xiàn)代碼如下:

package?com.ldc.org.myproject.demo.redis;

import?com.fasterxml.jackson.core.sym.Name;
import?redis.clients.jedis.JedisPubSub;

/**
?*?訂閱者
?*?@author?liduchang
?*/
public?class?Subscriber?extends?JedisPubSub?{
?//訂閱頻道名稱
?private?String?name;
?
?public?Subscriber(String?name)?{
??this.name?=?name;
?}

?/**
??*?訂閱者收到消息時會調(diào)用
??*/
?@Override
?public?void?onMessage(String?channel,?String?message)?{
??//?TODO?Auto-generated?method?stub
??super.onMessage(channel,?message);
??System.out.println("頻道:"+channel+"??接受的消息為:"+message);
?}

?/**
??*?訂閱了頻道會被調(diào)用
??*/
?@Override
?public?void?onSubscribe(String?channel,?int?subscribedChannels)?{
??System.out.println("訂閱了頻道:"+channel+"??訂閱數(shù)為:"+subscribedChannels);
?}

?/**
??*?取消訂閱頻道會被調(diào)用
??*/
?@Override
?public?void?onUnsubscribe(String?channel,?int?subscribedChannels)?{
??System.out.println("取消訂閱的頻道:"+channel+"??訂閱的頻道數(shù)量為:"+subscribedChannels);
?}
}

(4)這次創(chuàng)建的才是真正的訂閱者SubThread,上面的Subscriber是指為了測試實訂閱的時候或者發(fā)布消息,能夠有信息輸出:

package?com.ldc.org.myproject.demo.redis;

import?redis.clients.jedis.Jedis;
import?redis.clients.jedis.JedisPool;

/**
?*?訂閱者線程
?*?@author?liduchang
?*
?*/
public?class?SubThread?extends?Thread?{
?
?private?final?JedisPool?jedisPool;
?
?private?final?Subscriber?subscriber;
?
?private?String?name;
?
?public?SubThread(JedisPool?jedisPool,Subscriber?subscriber,String?name)?{
??super();
??this.jedisPool?=?jedisPool;
??this.subscriber?=?subscriber;
??this.name?=?name;
?}
?
?@Override
?public?void?run()?{
??Jedis?jedis?=?null;
??try?{
???jedis?=?jedisPool.getResource();
???//?訂閱頻道為name
???jedis.subscribe(subscriber,?name);
??}?catch?(Exception?e)?{
???System.err.println("訂閱失敗");
??????e.printStackTrace();
??}?finally?{
???if?(jedis!=null)?{
?????//?jedis.close();
?????//歸還連接到redis池中
????jedisPool.returnResource(jedis);
???}
??}
?}
}

(5)后面就是測試了,分別測試發(fā)布與訂閱的測試,發(fā)布者為TestPublisher,訂閱者為TestSubscriber

package?com.ldc.org.myproject.demo.redis;

import?java.util.concurrent.ExecutorService;
import?java.util.concurrent.Executors;
import?java.util.concurrent.TimeUnit;
import?redis.clients.jedis.JedisPool;

public?class?TestPublisher?{
?
?public?static?void?main(String[]?args)?throws?InterruptedException?{
??JedisPool?jedisPool?=?new?JedisPool("192.168.163.155");
??//?向ldc頻道發(fā)布消息
??Publisher?publisher?=?new?Publisher(jedisPool,?"ldc");
??publisher.start();
?}
}

訂閱者

package?com.ldc.org.myproject.demo.redis;

import?java.util.concurrent.ExecutorService;
import?java.util.concurrent.Executors;
import?java.util.concurrent.TimeUnit;

import?redis.clients.jedis.JedisPool;

public?class?TestSubscriber1?{
?
?public?static?void?main(String[]?args)?throws?InterruptedException?{
??JedisPool?jedisPool?=?new?JedisPool("192.168.163.155",6379);
??Subscriber?subscriber?=?new?Subscriber("黎杜");
??//?訂閱ldc頻道
??SubThread?thread=?new?SubThread(jedisPool,?subscriber,?"ldc");
??thread.start();
??Thread.sleep(600000);
??//?取消訂閱
??subscriber.unsubscribe("ldc");
?}
}

這里為了測試方便就直接創(chuàng)建線程的方式,更好的話可以使用線程池的方式通過線程池的submit方法來執(zhí)行線程,若是不用了可以使用shutdown方式關(guān)閉。

好了這一期的Redis的實現(xiàn)訂閱與發(fā)布的講解就說完了,我們下一期在講解Redis的集群的知識,下期再見。

特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

長按訂閱更多精彩▼

面試被問到Redis實現(xiàn)發(fā)布與訂閱,手摸手教

如有收獲,點個在看,誠摯感謝


免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
關(guān)閉
關(guān)閉