请分享您的想法,在Clojure 2024调查问卷!

欢迎!请查看关于页面以获取更多关于此的信息。

0投票
core.async
我有两个(或更多)监听器订阅到了多通道上。我想使用它们中的全部,然后随意让其中一个(或多个)离开,同时不会阻塞其他消费者或发布者。最初它们工作正常,直到其中一个想要停止监听。我本以为退出监听的监听器需要(成为一位好市民,并将其)通道从多通道中取消订阅(否则死锁是系统性的)。但是,如果在离开的监听器有机会取消订阅其通道之前将消息放入多通道,那么它就会在主线程(正在同时放置更多消息)上创建死锁。我没有找到一种可以确保我能够及时取消订阅通道以避免这种竞争条件的方法。

一旦我重现了死锁,repl就会冻结,直到我按下ctrl-c中断。
我还尝试在取消订阅之前关闭订阅的通道,但结果是相同的。

在下面的代码片段中,最后的(println "I'm done. You will never see this")永远不会被执行。发布者和剩下的消费者(消费者1)即使消费者2正在良好的意愿下尝试离开,也会发生死锁。


(require '[clojure.core.async :refer (chan go <! >!! mult tap untap)])
(let [to-mult (chan 1)
      m (mult to-mult)]

  ;;消费者1
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (println "1 Got! " v)
            (recur))
          (println "1 Exiting!"))))

  ;;消费者2
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (when (= v 42)  ;; 当值为42时退出
              (println "2 Got! " v)
              (recur)))
          (println "2 about to leave!")
          (Thread/sleep 5000) ;; 稍等片刻以加剧竞争条件
          (untap m c) ;; 在取消订阅此读取器之前
          (println "2 Exiting."))))

   (println "about to put a few messages that work")
   (doseq [a (range 10)]
     (>!! to-mult 42))
   (println "about to put a message that will force the exit of 2")
   (>!! to-mult 43)
   (println "about to put a few more messages before reader 2 is unsubscribed to show the deadlock")
   (doseq [a (range 10)]
     (println "putting msg" a)
     (>!! to-mult 42))
   (println "I'm done. You will never see this"))



即将放置一些可以正常工作的消息
2 已获得! 42
1 已获得! 42
2 已获得! 42
1 已获得! 42
1 已获得! 42
2 已获得! 42
1 已获得! 42
1 已获得! 42
2 已获得! 42
2 已获得! 42
2 已获得! 42
2 已获得! 1 已获得! 42
422 已获得! 42

1 已获得! 42
1 已获得! 42
2 已获得! 42
1 已获得! 42
即将放置将强制退出 2 的消息
1 已获得! 42
2 已获得! 在 2 被取消订阅之前,将要放置一些更多的消息以显示死锁
42
放置 msg 1 已获得! 0
2 即将离开!
43
1 已获得! 42
放置 msg 1
放置 msg 2
放置 msg 3
1 已获得! 42
2 正在退出。

3 答案

0投票
by

评论者:gshayban

Mathieu,这可能是预期的。需要注意的是,在使用 mult 的时候,为了确保正确的顺序/流程,你应在多路复用的源/生产端强制执行,而不是异步地尝试在 tap 端进行。

在将值分配给它们之前,Mult 将解引用稳定的 tap 集合,并且在值分配过程中不会动态调整,除非有一个 tap 已经关闭(链接:1)。如果你想在不断开 tap 的情况下稳定地 untap,你可以/应该让 'producer' 在输入通道的值之间的有序方式下完成。

知道一个 put 发生在关闭的通道上是在 0.1.278 版本中引入的。

通常,在通道的消耗端离开是很棘手的。根据你程序的语义,如果通道的生产端没有意识到消费者端可能会发出 close!,你可能不得不启动一个排空操作。

(defn drain (link: c) (go (当 (some? (<! c)) (recur))))

Go 语言不允许关闭只读通道(链接:2)

可能需要更好的文档。

(link: 1) https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L680-L682
(link: 2) https://golang.ac.cn/ref/spec#Close

0投票
by

评论者:samumbach

另请参阅 ASYNC-66。

0投票
by
参考资料:https://clojure.atlassian.net/browse/ASYNC-58(由 alex+import 报告)
...