请在 Clojure 2024 年调查问卷! 中分享您的想法。

欢迎!有关本站的工作方式,请参阅关于 页面以获取更多信息。

0
core.async
我在一个 multi channel 上有两个(或更多)监听器。我想使用它们全部,然后愿意有一个(或更多)监听器随时退出,而不会阻塞其他消费者或发布者。最初它们工作得很好,直到其中一个想要停止监听。我以为掉队的监听器需要(成为一个好公民)取消订阅 multi 通道上的其通道(否则死锁是系统的)。但是如果监听器在没有取消订阅其通道之前向多通道中放入消息,则会在主线程上(同时放入更多消息)产生死锁。我发现无法保证我可以在时间上取消订阅通道以避免这种竞争条件。

一旦我复现了死锁,repl 就会冻结,直到我使用 ctrl-c 中断。
我还试着在取消订阅它之前关闭了已订阅的通道,但结果是一样的。

在下面的片段中,最后的(println "I'm done. You will never see this")永远不会到达。即使消费者 2 正在善意地尝试离开,发布者和剩下的消费者(消费者 1)也会发生死锁。


(require '[clojure.core.async :refer (chan go <! >!! mult tap untap)])
(let [to-mult (chan 1)
      m (mult to-mult)]

  ;;消费者 1
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (println "1 Got! " v)
            (recur))
          (println "1 Exiting!"))))

  ;;消费者 2
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
          (when (= v 42)  ;; 当值不为 42 时退出
              (println "2 Got! " v)
              (recur)))
         (println "2 about to leave!")
          (Thread/sleep 5000) ;; 稍等一下以加剧竞争条件
          (untap m c) ;; 在取消订阅此订阅者之前
          (println "2 Exiting."))))

   (println "about to put a few messages that work")
   (doseq [a (range 10)]
     (>!! to-mult 42))
   (println "about to put a message that will force the exit of 2")
   (>!! to-mult 43)
   (println "about to put a few more messages before reader 2 is unsubscribed to show the deadlock")
   (doseq [a (range 10)]
     (println "放入消息" a)
     (>!! to-mult 42))
   (println "已完成。您将永远看不到这个"))



将要放入一些正常工作的消息
2 已接收!  42
1 已接收!  42
2 已接收!  42
1 已接收!  42
1 已接收!  42
2 已接收!  42
1 已接收!  42
1 已接收!  42
2 已接收!  42
2 已接收!  42
2 已接收!  42
2 已接收!  1 已接收!  42
422 已接收!  42

1 已接收!  42
1 已接收!  42
2 已接收!  42
1 已接收!  42
将要放入一条将导致2退出的消息
1 已接收!  42
2 已接收!  在2取消订阅之前将要放入一些更多消息以展示死锁
42
放入消息 1 已接收!  0
2 即将离开!
43
1 已接收!  42
放入消息 1
放入消息 2
放入消息 3
1 已接收!  42
2 正在退出。

3 答案

0
by

留言者:gshayban

Mathieu,这可能是预期的。重要的是要注意,在使用多播时,应确保在多播的源/生产侧强制执行,而不是在异步的吸嘴侧。

多播将在向它们分配值前解除稳定集吸嘴的引用,分发值过程中不会动态调整,除非吸嘴已关闭(链接:1)。如果您希望在不错开吸嘴的情况下稳定地取消吸嘴,您可以在输入通道之间有序地让“生产者”这样做。

在版本0.1.278中,了解到已向关闭的通道放入操作。

一般来说,在通道的消耗侧脱离比较棘手。根据您的进程的语义,如果通道的生产侧不知道来自消费侧的close!操作是否可能,您可能必须启动一个排水操作。

(defn drain (链接:c) (go (when (some? (

Go不允许关闭只读通道(顺便说一句,链接:2)

可能有必要撰写更好的文档。

(链接:1) https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L680-L682
(链接:2) https://golang.ac.cn/ref/spec#Close

0
by

留言者:samumbach

另请参阅 ASYNC-66。

0
by
参考: https://clojure.atlassian.net/browse/ASYNC-58(由 alex+import 提出)
...