2024年Clojure状态调查!中分享您的想法。

欢迎!请参见关于页面以了解如何工作的更多信息。

0
core.async
我有两个(或更多)监听器订阅到一个多通道。我想使用它们全部,然后让其中一个人(或一些人)随时离开而不阻塞其他消费者或发布者。最初它们工作得很好,直到其中一个想要停止监听。我以为退出监听器需要(成为一个好公民并由)从多通道取消订阅其通道(否则死锁会更加普遍)。然而,如果消息在离开监听器有机会从多通道取消订阅前放入多通道中,它将创建主线程上的死锁(该线程正在同时投放更多消息)。我找不到一种保证可以及时取消订阅通道以避免这种竞争条件的方法。

一旦我重现了死锁,repl就冻结了,直到我用ctrl-c中断。
我还尝试在取消订阅之前关闭订阅的通道,但结果相同。

在下面的示例中,最后的(println "I'm done. You will never see this")永远不会抵达。尽管消费者2正试图以良好的方式离开,但发布者和剩余的消费者(消费者1)仍然处于死锁状态。


(require '[clojure.core.async :refer (chan go <! >!! mult tap untap)])
(let [to-mult (chan 1)
      m (mult to-mult)]

  ;;消费者1
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (println "1 Got! " v)
            (recur))
          (println "1 Exiting!"))))

  ;;消费者2
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (when (= v 42)  ;; 当值为42时退出
              (println "2 Got! " v)
              (recur)))
          (println "2 about to leave!")
          (Thread/sleep 5000) ;; 等待一段时间以加剧竞争条件
          (untap m c) ;; 在取消订阅此reader之前
          (println "2 Exiting."))))

   (println "about to put a few messages that work")
   (doseq [a (range 10)]
     (>!! to-mult 42))
   (println "about to put a message that will force the exit of 2")
   (>!! to-mult 43)
   (println "about to put a few more messages before reader 2 is unsubscribed to show the deadlock")
   (doseq [a (range 10)]
     (println "putting msg" a)
     (>!! to-mult 42))
   (println "I'm done. You will never see this"))



即将发送几条有效消息
2 收到! 42
1 收到! 42
2 收到! 42
1 收到! 42
1 收到! 42
2 收到! 42
1 收到! 42
1 收到! 42
2 收到! 42
2 收到! 42
2 收到! 42
2 收到! 1 收到! 42
422 收到! 42

1 收到! 42
1 收到! 42
2 收到! 42
1 收到! 42
即将发送一条迫使2退出的消息
1 收到! 42
2 收到! 将在2退订之前发送更多消息以显示死锁
42
发送消息 1 收到! 0
2 即将离开!
43
1 收到! 42
发送消息 1
发送消息 2
发送消息 3
1 收到! 42
2 正在退出。

3 答案

0
by

评论者:gshayban

Mathieu,这可能是预期的。需要注意的是,为了保证在使用多路复用时正确的排序/流程,应在多路复用的源/生产端强制执行,而不是在异步的方式上在塔侧执行。

在向它们分配值之前,多路复用会取消引用稳定的戴普集,并且在分发值过程中不会动态调整,除非有塔已经被关闭(链接:1)。如果您想在不关闭塔的情况下稳定地取消塔,您可以在输入通道的值之间按顺序让'生产者'这样做。

知道通道发生放入操作是新在发布0.1.278。

通常,在通道的消耗侧退出是棘手的。根据您进程的语义,如果通道的生产端不知道来自消费端的close! 操作可能会发生,您可能必须启动一个排水操作。

(定义drain(链接:c)(go(当(某些?(<! c))(递归))))

惯例不允许关闭只读通道,仅供参考(链接:2)

可能需要更好的文档。

(链接:1) https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L680-L682
(链接:2) https://golang.ac.cn/ref/spec#Close

0
by

评论者:samumbach

另请参阅 ASYNC-66。

0
by
参考: https://clojure.atlassian.net/browse/ASYNC-58(由 alex+import 报告)
...