近期項目用到了緩存,我選用的是主流的google.guava作本地緩存,redis作分散式 緩存,先說說我對本地緩存和分散式緩存的理解吧,可能不太成熟的地方,大家指出,一起 學習.本地緩存的特點是速度快,不會受到網路阻塞的干擾,但由於是放在本地記憶體中,所 以容量較小,不能項目間共用比IO效率高比re ...
近期項目用到了緩存,我選用的是主流的google.guava作本地緩存,redis作分散式
緩存,先說說我對本地緩存和分散式緩存的理解吧,可能不太成熟的地方,大家指出,一起
學習.本地緩存的特點是速度快,不會受到網路阻塞的干擾,但由於是放在本地記憶體中,所
以容量較小,不能項目間共用比IO效率高比redis,且不會持久化.所以拿來存儲一些數據
很少,但又經常執行的,甚至只要啟動程式就會訪問的數據.
我們可以自定義初始化本地緩存的方法,指定存儲量和緩存淘汰機制.
/**
* 初始化本地緩存
*/
@PostConstruct
public void init() {
commonCache = CacheBuilder.newBuilder()
//設置緩存的
.initialCapacity(10)
//設置緩存中最大可以存儲的key數量,超過就會按照LRU的策略進行清除
.maximumSize(100).expireAfterWrite(60, TimeUnit.SECONDS).build();
}
幾種常見的記憶體淘汰機制:LRU,LFU
LFU:根據數據的歷史訪問頻率來淘汰數據
LRU:根據訪問時間的前後來淘汰數據,優先保留近期訪問的數據
guava是極度輕量級的cache,只具備基本的增刪改查和刷新數據,淘汰數據等功能,但能滿足大部分需求.
redis作為常用的分散式緩存,他是nosql資料庫,預設3主3從的無中心的分散式存儲架構,可以用來提高
併發量,分散式主鍵等功能.redis有rdb(快照)和aof兩種持久化到硬碟的方式,預設是rdb,快照能夠高
效且高性能的持久化數據,但存在數據丟失的風險,所以aof-日誌保存應運而出.aof能夠在用戶操作
redis時把每一個命令存儲下來,在出現宕機時,通過日誌還原數據.但這樣日誌文件就會非常大,redis
提供了一種重寫壓縮aof文件的機制BGREWRITEAOF命令.
redis集群的複製原理:
1、Slave啟動成功連接到master後會發送一個sync命令;
2、Master接到命令啟動後的存檔進程,同時收集所有接收到的用於修改數據集命令,在後臺進程執行
完畢之後,master
將傳送整個數據文件到slave,以完成一次完全同步;
3、全量複製:而slave服務在資料庫文件數據後,將其存檔並載入到記憶體中;
4、增量複製:Master繼續將新的所有收集到的修改命令依次傳給slave,完成同步;
5、但是只要是重新連接master,一次完全同步(全量複製)將被自動執行
redis採用的是純記憶體操作,非阻塞單線程多路復用,單線程可以避免上下文頻繁切換,多路復用提高效率.
Redis 採用的是定期刪除+惰性刪除策略。
定時刪除,用一個定時器來負責監視 Key,過期則自動刪除。雖然記憶體及時釋放,但是十分消耗 CPU 資源。
在大併發請求下,CPU 要將時間應用在處理請求,而不是刪除 Key,因此沒有採用這一策略。
定期刪除+惰性刪除是如何工作:
定期刪除,Redis 預設每個 100ms 檢查,是否有過期的 Key,有過期 Key 則刪除。
需要說明的是,Redis 不是每個 100ms 將所有的 Key 檢查一次,而是隨機抽取進行檢查(如果每隔 100ms,
全部 Key 進行檢查,Redis 豈不是卡死)。因此,如果只採用定期刪除策略,會導致很多 Key 到時間沒有
刪除。於是,惰性刪除派上用場。也就是說在你獲取某個 Key 的時候,Redis 會檢查一下,這個 Key
如果設置了過期時間,那麼是否過期了?如果過期了此時就會刪除。
採用定期刪除+惰性刪除就沒其他問題了麽?
不是的,如果定期刪除沒刪除 Key。然後你也沒即時去請求 Key,也就是說惰性刪除也沒生效。
這樣,Redis的記憶體會越來越高。那麼就應該採用記憶體淘汰機制。
過期策略存在的問題:
由於redis定期刪除是隨機抽取檢查,不可能掃描清除掉所有過期的key並刪除,然後一些key由於未被請求,
惰性刪除也未觸發。這樣redis的記憶體占用會越來越高。此時就需要記憶體淘汰機制 。
redis記憶體淘汰機制:
volatile-lru
從已設置過期時間的數據集中挑選最近最少使用的數據淘汰。redis並不是保證取得所有數據集中最近最少
使用的鍵值對,而只是隨機挑選的幾個鍵值對中的, 當記憶體達到限制的時候無法寫入非過期時間的數據集。
volatile-ttl
從已設置過期時間的數據集中挑選將要過期的數據淘汰。redis 並不是保證取得所有數據集中最近將要
過期的鍵值對,而只是隨機挑選的幾個鍵值對中的, 當記憶體達到限制的時候無法寫入非過期時間的數據集。
volatile-random
從已設置過期時間的數據集中任意選擇數據淘汰。當記憶體達到限制的時候無法寫入非過期時間的數據集。
allkeys-lru
從數據集中挑選最近最少使用的數據淘汰。當記憶體達到限制的時候,對所有數據集挑選最近最少使用的數據
淘汰,可寫入新的數據集。
allkeys-random
從數據集中任意選擇數據淘汰,當記憶體達到限制的時候,對所有數據集挑選隨機淘汰,可寫入新的數據集。
no-enviction
當記憶體達到限制的時候,不淘汰任何數據,不可寫入任何數據集,所有引起申請記憶體的命令會報錯。
下麵看看幾種策略的適用場景
allkeys-lru:如果我們的應用對緩存的訪問符合冪律分佈,也就是存在相對熱點數據,
或者我們不太清楚我們應用的緩存訪問分佈狀況,我們可以選擇allkeys-lru策略。
allkeys-random:如果我們的應用對於緩存key的訪問概率相等,則可以使用這個策略。
volatile-ttl:這種策略使得我們可以向Redis提示哪些key更適合被eviction。
另外,volatile-lru策略和volatile-random策略適合我們將一個Redis實例既應用於
緩存和又應用於持久化存儲的時候,然而我們也可以通過使用兩個Redis實例來達到相同的效果,
將key設置過期時間實際上會消耗更多的記憶體,因此我們建議使用allkeys-lru策略從而更有效率的使用記憶體。
redis的String字元串的應用場景:
String類型二進位安全
1、緩存功能:String字元串是最常用的數據類型,不僅僅是redis,各個語言都是最基本類型,
因此,利用redis作為緩存,配合其它資料庫作為存儲層,利用redis支持高併發的特點,可以大大加快系統的
讀寫速度、以及降低後端資料庫的壓力。
2、計數器:許多系統都會使用redis作為系統的實時計數器,可以快速實現計數和查詢的功能。
而且最終的數據結果可以按照特定的時間落地到資料庫或者其它存儲介質當中進行永久保存。
3、共用用戶session:用戶重新刷新一次界面,可能需要訪問一下數據進行重新登錄,或者訪問頁面緩存cookie,
但是可以利用redis將用戶的session集中管理,在這種模式只需要保證redis的高可用,
每次用戶session的更新和獲取都可以快速完成。大大提高效率。
4,限制某段時間內的訪問次數,就比如我們登錄的功能可以用手機獲取驗證碼登錄,但是我們發送驗證碼使用的第三方,
是多少錢多少條的,肯定不能讓他一直點,一直發簡訊,就算前端js做了校驗,若有些人用fiddler攔截繞過前臺就麻煩了,
這時候可以用redis的incr命令和expire結合起來做一個解決方案
5,分散式鎖 setnx expire(過期時間)
redis的hashmap類型的應用場景:
1,Hash存儲購物車數據的操作
2.存儲用戶關係,用戶id,年齡,姓名是key對應value值
3.hash 類型十分適合存儲對象類數據,相對於在 string 中介紹的把對象轉化為 json 字元串存儲,
hash 的結構可以任意添加或刪除‘欄位名’,更加高效靈活。
4.類似於表記錄的存儲
5.頁面視圖所需數據的存儲
redis的list類型的應用場景:
對數據量大的集合數據刪減
列表數據顯示、關註列表、粉絲列表、留言評價等…分頁、熱點新聞(Top5)等
利用LRANGE還可以很方便的實現分頁的功能,在博客系統中,每片博文的評論也可以存入一個單獨的list中。
任務隊列
(list通常用來實現一個消息隊列,而且可以確保先後順序,不必像MySQL那樣還需要通過ORDER BY來進行排序)
redis的set類型的應用場景:
對兩個集合間的數據[計算]進行交集、並集、差集運算
1、以非常方便的實現如共同關註、共同喜好、二度好友等功能。對上面的所有集合操作,你還可以使用不同的命令
選擇將結果返回給客戶端還是存儲到一個新的集合中。
2、利用唯一性,可以統計訪問網站的所有獨立 IP
redis的zset類型的應用場景:
常應用於:排行榜
1比如twitter 的public timeline可以以發表時間作為score來存儲,這樣獲取時就是自動按時間排好序的。
2比如一個存儲全班同學成績的Sorted Set,其集合value可以是同學的學號,而score就可以是其考試得分,
這樣在數據插入集合的時候,就已經進行了天然的排序
3還可以用Sorted Set來做帶權重的隊列,比如普通消息的score為1,重要消息的score為2,然後工作線程
可以選擇按score的倒序來獲取工作任務。讓重要的任務優先執行。
RESP 是redis客戶端和服務端之前使用的一種通訊協議;
RESP 的特點:實現簡單、快速解析、可讀性好
緩存和資料庫一致性解決方案
1.第一種方案:採用延時雙刪策略
在寫庫前後都進行redis.del(key)操作,並且設定合理的超時時間。
偽代碼如下
public void write(String key,Object data){
redis.delKey(key);
db.updateData(data);
Thread.sleep(500);
redis.delKey(key); }
2.具體的步驟就是:
1)先刪除緩存
2)再寫資料庫
3)休眠500毫秒
4)再次刪除緩存
2.方案二:使用消息隊列非同步更新,把更新的數據先同步到redis,再發送到消息隊列mq中,
由mysql資料庫自己到mq中讀取,以合適的速度也不易導致資料庫奔潰.
常見的mq有:kafka,ribbitmq,rocketmq等各有各的特點,但都能滿足這個需求.
mq的常見功能:非同步,解耦,削峰等可以極大的提高系統的高併發,高可用,高性能.
下麵是springboot集成redis的案例:
<!--redis start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--redis end-->
redis:
database: 0
host:
port: 6379
password: redis
我選擇用RedisTemplate來操作redis
private RedisTemplate<String, Object> redisTemplate;
//=============================common============================
/**
* 指定緩存失效時間
*
* @param key 鍵
* @param time 時間(秒)
* @return {@link Boolean}
*/
@Override
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根據key 獲取過期時間
*
* @param key 鍵 不能為null
* @return 時間(秒) 返回0代表為永久有效
*/
@Override
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判斷key是否存在
*
* @param key 鍵
* @return true 存在 false不存在
*/
@Override
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 刪除緩存
*
* @param key 可以傳一個值 或多個
*/
@Override
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtil.arrayToList(key));
}
}
}
//============================String=============================
/**
* 普通緩存獲取
*
* @param key 鍵
* @return 值
*/
@Override
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通緩存放入
*
* @param key 鍵
* @param value 值
* @return true成功 false失敗
*/
@Override
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通緩存放入並設置時間
*
* @param key 鍵
* @param value 值
* @param time 時間(秒) time要大於0 如果time小於等於0 將設置無限期
* @return true成功 false 失敗
*/
@Override
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 遞增
*
* @param key 鍵
* @param delta 要增加幾(大於0)
* @return
*/
@Override
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("遞增因數必須大於0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 遞減
*
* @param key 鍵
* @param delta 要減少幾(小於0)
* @return
*/
@Override
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("遞減因數必須大於0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
//================================Map=================================
/**
* HashGet
*
* @param key 鍵 不能為null
* @param item 項 不能為null
* @return 值
*/
@Override
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 獲取hashKey對應的所有鍵值
*
* @param key 鍵
* @return 對應的多個鍵值
*/
@Override
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 鍵
* @param map 對應多個鍵值
* @return true 成功 false 失敗
*/
@Override
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 並設置時間
*
* @param key 鍵
* @param map 對應多個鍵值
* @param time 時間(秒)
* @return true成功 false失敗
*/
@Override
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一張hash表中放入數據,如果不存在將創建
*
* @param key 鍵
* @param item 項
* @param value 值
* @return true 成功 false失敗
*/
@Override
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一張hash表中放入數據,如果不存在將創建
*
* @param key 鍵
* @param item 項
* @param value 值
* @param time 時間(秒) 註意:如果已存在的hash表有時間,這裡將會替換原有的時間
* @return true 成功 false失敗
*/
@Override
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 刪除hash表中的值
*
* @param key 鍵 不能為null
* @param item 項 可以使多個 不能為null
*/
@Override
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判斷hash表中是否有該項的值
*
* @param key 鍵 不能為null
* @param item 項 不能為null
* @return true 存在 false不存在
*/
@Override
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash遞增 如果不存在,就會創建一個 並把新增後的值返回
*
* @param key 鍵
* @param item 項
* @param by 要增加幾(大於0)
* @return
*/
@Override
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash遞減
*
* @param key 鍵
* @param item 項
* @param by 要減少記(小於0)
* @return
*/
@Override
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
//============================set=============================
/**
* 根據key獲取Set中的所有值
*
* @param key 鍵
* @return
*/
@Override
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根據value從一個set中查詢,是否存在
*
* @param key 鍵
* @param value 值
* @return true 存在 false不存在
*/
@Override
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 將數據放入set緩存
*
* @param key 鍵
* @param values 值 可以是多個
* @return 成功個數
*/
@Override
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 將set數據放入緩存
*
* @param key 鍵
* @param time 時間(秒)
* @param values 值 可以是多個
* @return 成功個數
*/
@Override
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 獲取set緩存的長度
*
* @param key 鍵
* @return
*/
@Override
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值為value的
*
* @param key 鍵
* @param values 值 可以是多個
* @return 移除的個數
*/
@Override
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
//===============================list=================================
/**
* 獲取list緩存的內容
*
* @param key 鍵
* @param start 開始
* @param end 結束 0 到 -1代表所有值
* @return
*/
@Override
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 獲取list緩存的長度
*
* @param key 鍵
* @return
*/
@Override
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通過索引 獲取list中的值
*
* @param key 鍵
* @param index 索引 index>=0時, 0 表頭,1 第二個元素,依次類推;index<0時,-1,表尾,-2倒數第二個元素,依次類推
* @return
*/
@Override
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 將list放入緩存
*
* @param key 鍵
* @param value 值
* @return
*/
@Override
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 將list放入緩存
*
* @param key 鍵
* @param value 值