请在 Clojure 2024 状态调查!中分享你的想法。

欢迎!请查看关于页面以获取更多此功能的信息。

0
core.async
我在一个多通道上订阅了两个(或更多)监听器。我想使用它们中的所有(然后让其中一些)随心所欲地退出,而不会阻塞其他消费者或发布者。最初它们工作正常,直到其中一个监听器想要停止监听。我认为退出的监听器必须(成为一个好市民)从多通道中取消订阅其通道(否则死锁是系统的)。然而,如果在监听器有机会取消订阅其通道之前向多通道中放入消息,它会在主线程(正在同时放入更多消息的线程)上创建死锁。我找不到一种方法来保证我可以及时取消订阅通道以避免这种竞争条件。

一旦我复现了死锁,REPL 就会冻结,直到我用 Ctrl-C 中断。
我还尝试在取消订阅之前关闭订阅的通道,结果是一样的。

在以下的代码片段中,最后的(println "I'm done. You will never see this")永不执行。发布者和剩余的消费者(消费者 1)即使消费者 2 正在良好地尝试退出也会发生死锁。


(require '[clojure.core.async :refer (chan go <! >!! mult tap untap)])
(let [to-mult (chan 1)
      m (mult to-mult)]

  ;;消费者 1
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (println "1 Got! " v)
            (recur))
          (println "1 Exiting!"))))

  ;;消费者 2
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (when (= v 42)  ;; 当值不是 42 时退出
              (println "2 Got! " v)
              (recur)))
          (println "2 about to leave!")
          (Thread/sleep 5000) ;; 等待一段时间以加剧竞争条件
          (untap m c) ;; 在取消这个读者的订阅之前
          (println "2 Exiting."))))

   (println "about to put a few messages that work")
   (doseq [a (range 10)]
     (>!! to-mult 42))
   (println "about to put a message that will force the exit of 2")
   (>!! to-mult 43)
   (println "about to put a few more messages before reader 2 is unsubscribed to show the deadlock")
   (doseq [a (range 10)]
     (println "putting msg" a)
     (>!! to-mult 42))
   (println "I'm done. You will never see this"))



about to put a few messages that work
2 Got!  42
1 Got!  42
2 Got!  42
1 Got!  42
1 Got!  42
2 Got!  42
1 Got!  42
1 Got!  42
2 Got!  42
2 Got!  42
2 Got!  42
2 已获取!  1 已获取!  42
422 已获取!  42

1 Got!  42
1 Got!  42
2 Got!  42
1 Got!  42
准备放入一条将强制退出2的消息
1 Got!  42
2 已获取!  在取消订阅2之前,准备放入更多消息以显示死锁
42
放入消息 1 已获取!  0
2 即将离开!
43
1 Got!  42
放入消息 1
放入消息 2
放入消息 3
1 Got!  42
2 正在退出。

3 个答案

0
by

评论由: gshayban 创建

Mathieu,这可能是预期结果。重要的是要注意,在使用mult时,为了保证正确的排序/流动,应在mult的源/生产者端强制执行,而不是在异步的tap端执行。

mult在向它们分发一个值之前,将在静态 taps 上解引用一个稳定的集合,并且在下发值过程中不会动态调整,除非一个tap已被关闭(链接:1)。如果您想在保持tap打开的情况下稳定地untap,可以/应该由'生产者'在有输入通道之间的值上以有序的方式进行。

在发布0.1.278版本后,了解对处于关闭状态通道的写入操作是新的。

通常,在通道的消费端撤离是复杂的。根据您的过程的语义,如果通道的生产者端没有意识到来自消费者端的关闭!可以发生,您可能需要启动一个排水操作。

(定义 drain (链接:c) (go (当 (! c) 有时 (递归))))

Go语言禁止关闭只读通道FWIW(链接:2)

可能需要更好的文档。

(链接:1) https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L680-L682
(链接:2) https://golang.ac.cn/ref/spec#Close

0
by

评论由: samumbach 创建

另请参阅 ASYNC-66。

0
by
...