Guava Cache 原理分析與最佳實踐

梓川耶 2021-08-15 12:58:41 阅读数:476

本文一共[544]字,预计阅读时长:1分钟~
guava cache 原理 分析 最佳

前言

目前大部分互聯網架構 Cache 已經成為了必可不少的一環。常用的方案有大家熟知的 NoSQL 數據庫(Redis、Memcached),也有大量的進程內緩存比如 EhCache 、Guava Cache、Caffeine 等。

本系列文章會選取本地緩存和分布式緩存(NoSQL)的優秀框架比較他們各自的優缺點、應用場景、項目中的最佳實踐以及原理分析。本文主要針對本地 Cache 的老大哥 Guava Cache 進行介紹和分析。

基本用法

Guava Cache 通過簡單好用的 Client 可以快速構造出符合需求的 Cache 對象,不需要過多複雜的配置,大多數情况就像構造一個 POJO 一樣的簡單。這裏介紹兩種構造 Cache 對象的方式:CacheLoaderCallable

CacheLoader

構造 LoadingCache 的關鍵在於實現 load 方法,也就是在需要 訪問的緩存項不存在的時候 Cache 會自動調用 load 方法將數據加載到 Cache 中。這裏你肯定會想假如有多個線程過來訪問這個不存在的緩存項怎麼辦,也就是緩存的並發問題如何怎麼處理是否需要人工介入,這些在下文中也會介紹到。

除了實現 load 方法之外還可以配置緩存相關的一些性質,比如過期加載策略、刷新策略 。

private static final LoadingCache<String, String> CACHE = CacheBuilder
.newBuilder()
// 最大容量為 100 超過容量有對應的淘汰機制,下文詳述
.maximumSize(100)
// 緩存項寫入後多久過期,下文詳述
.expireAfterWrite(60 * 5, TimeUnit.SECONDS)
// 緩存寫入後多久自動刷新一次,下文詳述
.refreshAfterWrite(60, TimeUnit.SECONDS)
// 創建一個 CacheLoader,load 錶示緩存不存在的時候加載到緩存並返回
.build(new CacheLoader<String, String>() {
// 加載緩存數據的方法
@Override
public String load(String key) {
return "cache [" + key + "]";
}
});
public void getTest() throws Exception {
CACHE.get("KEY_25487");
}

Callable


除了在構造 Cache 對象的時候指定 load 方法來加載緩存外,我們亦可以在獲取緩存項時指定載入緩存的方法,並且可以根據使用場景在不同的比特置采用不同的加載方式。

比如在某些比特置可以通過二級緩存加載不存在的緩存項,而有些比特置則可以直接從 DB 加載緩存項。

// 注意返回值是 Cache
private static final Cache<String, String> SIMPLE_CACHE = CacheBuilder
.newBuilder()
.build();
public void getTest1() throws Exception {
String key = "KEY_25487";
// get 緩存項的時候指定 callable 加載緩存項
SIMPLE_CACHE.get(key, () -> "cache [" + key + "]");
}

緩存項加載機制


如果某個緩存過期了或者緩存項不存在於緩存中,而恰巧此此時有大量請求過來請求這個緩存項,如果沒有保護機制就會導致大量的線程同時請求數據源加載數據並生成緩存項,這就是所謂的 “緩存擊穿”

舉個簡單的例子,某個時刻有 100 個請求同時請求 KEY_25487 這個緩存項,而不巧這個緩存項剛好失效了,那麼這 100 個線程(如果有這麼多機器和流量的話)就會同時從 DB 加載這個數據,很可怕的點在於就算某一個線程率先獲取到數據生成了緩存項,其他的線程還是繼續請求 DB 而不會走到緩存

【緩存擊穿圖例】


看到上面這個圖或許你已經有方法解這個問題了,如果多個線程過來如果我們 只讓一個線程去加載數據生成緩存項,其他線程等待然後讀取生成好的緩存項 豈不是就完美解决。那麼恭喜你在這個問題上,和 Google 工程師的思路是一致的。不過采用這個方案,問題是解了但沒有完全解,後面會說到它的缺陷。

其實 Guava Cache 在 load 的時候做了並發控制,在多個線程請求一個不存在或者過期的緩存項時保證只有一個線程進入 load 方法,其他線程等待直到緩存項被生成,這樣就避免了大量的線程擊穿緩存直達 DB 。不過試想下如果有上萬 QPS 同時過來會有大量的線程阻塞導致線程無法釋放,甚至會出現線程池滿的尷尬場景,這也是說為什麼這個方案解了 “緩存擊穿” 問題但又沒完全解。

上述機制其實就是 expireAfterWrite/expireAfterAccess 來控制的,如果你配置了過期策略對應的緩存項在過期後被訪問就會走上述流程來加載緩存項。

緩存項刷新機制


緩存項的刷新和加載看起來是相似的,都是讓緩存數據處於最新的狀態。區別在於:

  • 緩存項加載是一個被動 的過程,而 緩存刷新是一個主動觸發 動作。如果緩存項不存在或者過期只有下次 get 的時候才會觸發新值加載。而緩存刷新則更加主動替換緩存中的老值。
  • 另外一個很重要點的在於,緩存刷新的項目一定是存在緩存中 的,他是對老值的替換而非是對 NULL 值的替換。

由於緩存項刷新的前提是該緩存項存在於緩存中,那麼緩存的刷新就不用像緩存加載的流程一樣讓其他線程等待而是允許一個線程去數據源獲取數據,其他線程都先返回老值直到异步線程生成了新緩存項

這個方案完美解决了上述遇到的 “緩存擊穿” 問題,不過 他的前提是已經生成緩存項了 。在實際生產情况下我們可以做 緩存預熱 ,提前生成緩存項,避免流量洪峰造成的線程堆積。

這套機制在 Guava Cache 中是通過 refreshAfterWrite 實現的,在配置刷新策略後,對應的緩存項會按照設定的時間定時刷新,避免線程阻塞的同時保證緩存項處於最新狀態。

但他也不是完美的,比如他的限制是緩存項已經生成,並且 如果恰巧你運氣不好,大量的緩存項同時需要刷新或者過期, 就會有大量的線程請求 DB,這就是常說的 “緩存血崩”

緩存項异步刷新機制


上面說到緩存項大面積失效或者刷新會導致雪崩,那麼就只能限制訪問 DB 的數量了,比特置有三個地方:

  1. 源頭:因為加載緩存的線程就是前臺請求線程,所以如果 控制請求線程數量 的確是减少大面積失效對 DB 的請求,那這樣一來就不存在高並發請求,就算不用緩存都可以。

  2. 中間層緩沖:因為請求線程和訪問 DB 的線程是同一個,假如在 中間加一層緩沖,通過一個後臺線程池去异步刷新緩存 所有請求線程直接返回老值,這樣對於 DB 的訪問的流量就可以被後臺線程池的池大小控住。

  3. 底層:直接 控 DB 連接池的池大小,這樣訪問 DB 的連接數自然就少了,但是如果大量請求到連接池發現獲取不到連接程序一樣會出現連接池滿的問題,會有大量連接被拒絕的异常。

所以比較合適的方式是通過添加一個异步線程池异步刷新數據,在 Guava Cache 中實現方案是重寫 CacheLoader 的 reload 方法


private static final LoadingCache<String, String> ASYNC_CACHE = CacheBuilder.newBuilder()
.build(
CacheLoader.asyncReloading(new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key;
}
@Override
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
return super.reload(key, oldValue);
}
}, new ThreadPoolExecutor(5, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>()))
);

LocalCache 源碼分析


先整體看下 Cache 的類結構,下面的這些子類錶示了不同的創建方式本質還都是 LocalCache

【Cache 類圖】

核心代碼都在 LocalCache 這個文件中,並且通過這個繼承關系可以看出 Guava Cache 的本質就是 ConcurrentMap。

【LocalCache 繼承與實現】

在看源碼之前先理一下流程,先理清思路。如果想直接看源碼理解流程可以先跳過這張圖 ~

【 get 緩存數據流程圖】

這裏核心理一下 Get 的流程,put 階段比較簡單就不做分析了。

LocalCache#get


V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
// 根據 hash 獲取對應的 segment 然後從 segment 獲取具體值
return segmentFor(hash).get(key, hash, loader);
}

Segment#get


V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
// count 錶示在這個 segment 中存活的項目個數
if (count != 0) {
// 獲取 segment 中的元素 (ReferenceEntry) 包含正在 load 的數據
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
// 獲取緩存值,如果是 load,invalid,expired 返回 null,同時檢查是否過期了,過期移除並返回 null
V value = getLiveValue(e, now);
if (value != null) {
// 記錄訪問時間
recordRead(e, now);
// 記錄緩存命中一次
statsCounter.recordHits(1);
// 刷新緩存並返回緩存值 ,後面展開
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
// 如果在 loading 等著 ,後面展開
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// 走到這說明從來沒寫入過值 或者 值為 null 或者 過期(數據還沒做清理),後面展開
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}

Segment#scheduleRefresh


// com.google.common.cache.LocalCache.Segment#scheduleRefresh
V scheduleRefresh(
ReferenceEntry<K, V> entry,
K key,
int hash,
V oldValue,
long now,
CacheLoader<? super K, V> loader) {
if (
// 配置了刷新策略 refreshAfterWrite
map.refreshes()
// 到刷新時間了
&& (now - entry.getWriteTime() > map.refreshNanos)
// 沒在 loading
&& !entry.getValueReference().isLoading()) {
// 開始刷新,下面展開
V newValue = refresh(key, hash, loader, true);
if (newValue != null) {
return newValue;
}
}
return oldValue;
}
// com.google.common.cache.LocalCache.Segment#refresh
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
// 插入 loading 節點
final LoadingValueReference<K, V> loadingValueReference =
insertLoadingValueReference(key, hash, checkTime);
if (loadingValueReference == null) {
return null;
}
// 异步刷新,下面展開
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
return null;
}
// com.google.common.cache.LocalCache.Segment#loadAsync
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
// 通過 loader 异步加載數據,下面展開
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
},
directExecutor());
return loadingFuture;
}
// com.google.common.cache.LocalCache.LoadingValueReference#loadFuture
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
stopwatch.start();
// oldValue 指在寫入 loading 節點前這個比特置的值,如果這個比特置之前沒有值 oldValue 會被賦值為 UNSET
// UNSET.get() 值為 null ,所以這個緩存項從來沒有進入緩存需要同步 load 具體原因前面提到了,如果通過
// 异步 reload ,由於沒有老值會導致其他線程返回的都是 null
V previousValue = oldValue.get();
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
// 异步 load
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return result;
}
}

Segment#waitForLoadingValue


V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
throws ExecutionException {
// 首先你要是一個 loading 節點
if (!valueReference.isLoading()) {
throw new AssertionError();
}
checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
// don't consider expiration as we're concurrent with loading
try {
V value = valueReference.waitForValue();
if (value == null) {
throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
}
// re-read ticker now that loading has completed
long now = map.ticker.read();
recordRead(e, now);
return value;
} finally {
statsCounter.recordMisses(1);
}
}
// com.google.common.cache.LocalCache.LoadingValueReference#waitForValue
public V waitForValue() throws ExecutionException {
return getUninterruptibly(futureValue);
}
// com.google.common.util.concurrent.Uninterruptibles#getUninterruptibly
public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
boolean interrupted = false;
try {
while (true) {
try {
// hang 住,如果該線程被打斷了繼續回去 hang 住等結果,直到有結果返回
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

Segment#lockedGetOrLoad


V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
// 要對 segment 寫操作 ,先加鎖
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);
// 這裏基本就是 HashMap 的代碼,如果沒有 segment 的數組下標沖突了就拉一個鏈錶
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
// 如果在加載中 不做任何處理
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
// 如果緩存項為 null 數據已經被删除,通知對應的 queue
if (value == null) {
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
// 這個是 double check 如果緩存項過期 數據沒被删除,通知對應的 queue
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accommodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
// 再次看到的時候這個比特置有值了直接返回
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
return value;
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
// 沒有 loading ,創建一個 loading 節點
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}

總結


結合上面圖以及源碼我們發現在整個流程中 GuavaCache 是沒有額外的線程去做數據清理和刷新的,基本都是通過 Get 方法來觸發這些動作 ,减少了設計的複雜性和降低了系統開銷。

簡單回顧下 Get 的流程以及在每個階段做的事情,返回的值。首先判斷緩存是否過期然後判斷是否需要刷新,如果過期了就調用 loading 去同步加載數據(其他線程阻塞),如果是僅僅需要刷新調用 reloading 异步加載(其他線程返回老值)。

所以如果 refreshTime > expireTime 意味著永遠走不到緩存刷新邏輯,緩存刷新是為了在緩存有效期內盡量保證緩存數據一致性所以在配置刷新策略和過期策略時一定保證 refreshTime < expireTime 。

最後關於 Guava Cache 的使用建議 (最佳實踐) :

  1. 如果刷新時間配置的較短一定要重載 reload 异步加載數據的方法,傳入一個自定義線程池保護 DB
  2. 失效時間一定要大於刷新時間
  3. 如果是常駐內存的一些少量數據失效時間可以配置的較長刷新時間配置短一點 (根據業務對緩存失效容忍度)
版权声明:本文为[梓川耶]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/08/20210815125809543g.html