2024 年 Clojure 状态调查 中分享您的想法!

欢迎!请查阅 关于 页面,了解更多此网站的工作方式。

+1 投票
core.cache
重新标签

我们最近将 core.cache 应用到了一个网络环境中,并发现了多线程访问的缓存雪崩现象。回顾起来很显然,但在文档中对雪崩的提及使我认为这已经被处理了。

Slack 的帮助下,我已经在这里重现了这个问题

(let [thread-count 20
      cache-atom (-> {}
                   (cache/ttl-cache-factory :ttl 120000)
                   (cache/lu-cache-factory :threshold 100)
                   (atom))
      latch (java.util.concurrent.CountDownLatch. thread-count)
      invocations-counter (atom 0)]
 (doseq [i (range thread-count)]
   (println "starting thread" i)
   (.start (Thread. (fn []
                     (cache-wrapped/lookup-or-miss cache-atom "my-key"
                                                   (fn [k]
                                                     (swap! invocations-counter inc)
                                                     (Thread/sleep 3000)
                                                     "some value"))
                     (.countDown latch)))))

 (.await latch)
 (deref invocations-counter))

我预计调用计数器的值应该是 1,但实际上它是 20(每个线程调用一次)。

根据使用 core.memoize 的建议,这按照预期工作

    (let [thread-count 20
          invocations-counter (atom 0)
          expensive-function (fn [k]
                             (swap! invocations-counter inc)
                             (Thread/sleep 3000)
                             (str "value-" k))
          cache (-> {}
                  (cache/ttl-cache-factory :ttl 120000)
                  (cache/lu-cache-factory :threshold 100))
          memoized-function (memoize/memoizer expensive-function cache)
     latch (java.util.concurrent.CountDownLatch. thread-count)]
     (doseq [i (range thread-count)]
       (println "starting thread" i)
       (.start (Thread. (fn []
                         (memoized-function "my-key")
                         (.countDown latch)))))

     (.await latch)
     (assert (= 1 (deref invocations-counter))))

2 个答案

+1 投票
谢谢,Alex!
+1 投票

lookup-or-miss 确实有一个并发故障。在第 57 行和 67 行,延迟的 deref-ed 值通过 swap! 插入缓存。这个延迟只能执行一次,但是当许多线程同时对相同值进行缓存失效时,许多线程将:
获取值。因此,在值被插入缓存之前,将会执行许多操作。

https://github.com/clojure/core.cache/blob/master/src/main/clojure/clojure/core/cache/wrapped.clj#L57
https://github.com/clojure/core.cache/blob/master/src/main/clojure/clojure/core/cache/wrapped.clj#L67

为了修复此问题,包装的命名空间必须将延迟存储在缓存中,并在检索时仅解析这些延迟。

当前版本的解决方案是你自己完成此包装

;; clj -Sdeps '{:deps {org.clojure/core.cache {:mvn/version "1.0.225"}}}'

(require '[clojure.core.cache :as cache]
         '[clojure.core.cache.wrapped :as cache-wrapped])

(let [thread-count 20
      cache-atom (-> {}
                   (cache/ttl-cache-factory :ttl 120000)
                   (cache/lu-cache-factory :threshold 100)
                   (atom))
      latch (java.util.concurrent.CountDownLatch. thread-count)
      invocations-counter (atom 0)]
 (doseq [i (range thread-count)]
   (println "starting thread" i)
   (.start (Thread. (fn []
                     ;; Deref the outcome
                     @(cache-wrapped/lookup-or-miss cache-atom "my-key"
                                                   (fn [k]
                                                     ;; Put the action in a delay
                                                     (delay
                                                       (swap! invocations-counter inc)
                                                       (Thread/sleep 3000)
                                                       "some value")))
                     (.countDown latch)))))

 (.await latch)
 (deref invocations-counter))
by
我认为修复这个问题可能会比较困难,因为这会导致现有的代码出错,因为API的每个路径都需要对这些新的延迟进行包装/解包 -- 包括种子需要将提供的基数中的每个值都包装起来,这可能已经是一个组合的缓存数据结构。
...