2024 年 Clojure 状态调查! 分享您的想法。

欢迎!有关此功能的更多信息,请参阅 关于 页面。

+1
core.async
当多通道大约与源通道关闭同时被按下时,被按下的通道可能不会被关闭。


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))


上述代码有时返回 true,有时返回 false。

*原因:* 这是由以下 {{mult}} 函数中的代码引起的


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


在 cs 解引用后按下的任何通道都不会被关闭。

*方法:* 解决这个问题的可能方法是在 cs 原子中始终关闭已连接到关闭源通道的通道。例如:


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; 将始终关闭 c


这可以通过向 cs 原子添加一个标志来实现,以表示多通道是打开的还是关闭的。如果已关闭,将自动关闭任何连接的通道。

7 个答案

0

评论由:jreeves 提供

供参考,以下是我在 mult 中使用的自定义修复

`
(defn mult [ch]
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0

评论由:dnolen 提供

这是否也已在 master 中修复?谢谢。

0

评论者:gshayban

我理解了场景,但老实说,我不确定这是mult的问题还是使用方式的问题。一个通道不必总是返回一个take。可以采取措施(如alts或其他机制)来防范“迟到tap”消费者,并且你还可以通过更改“生产”方面的策略来禁止迟到tap。

(链接:~richhickey)你能评论一下吗?

0

评论由:jreeves 提供

"tap"函数目前有一个显式的"close?"标志,如果一个tapped通道在源通道关闭时不能保证关闭,那么这个参数可能不应该存在。另外,如果取消自动关闭taps,我们是否也应该在"sub"上移除"close?"参数?

0

评论者:gshayban

这不仅仅是尊重标志。与关闭行为相关,通道可以在没有接收任何内容的情况下进行tap和untap,同时mult过程仍将值分配给另一组通道(如ABA问题)。也可以将关闭后分发到最后一组解引用通道的tap作为错误。这与熟悉的永久nil接收不同,但mult已经与简单通道不同。

0
_评论者:stuart.sierra_

我最近在研究一个依赖于{{mult}}和{{pipeline}}默认行为自动关闭下游通道的系统。有时初始“输入”通道很快被关闭,而通道图仍在构建中。结果,一些输出通道保持打开状态,一些go-loops持续运行。

我解决的方案是在任何处理之前尽早创建taps,但它让我思考了默认行为应该是怎样的。

我预期的行为是,当在对{{mult}}调用{{tap}}并设置为{{close?}}参数为true(默认值)时,并且mult的输入通道已经关闭,那么传递给{{tap}}的通道立即关闭。
0
参考:<a href="https://clojure.atlassian.net/browse/ASYNC-64" rel="nofollow" target="_blank">https://clojure.atlassian.net/browse/ASYNC-64
欢迎来到 Clojure 问答社区,在这里您可以提出问题并获得 Clojure 社区成员的解答。

类别

...