请在2024 年 Clojure 状态调查中分享您的想法!

欢迎!请查阅关于页面,了解更多有关如何使用本网站的信息。

+1
core.async
当多通道与源通道关闭几乎同时打开时,被打开的通道可能不会被关闭。


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  ;(tap m c)
  (close! s)
  (impl/closed? c))


上述代码有时会返回 true,有时会返回 false。

*原因:* 这是由以下 {{mult}} 函数中的代码引起的


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


在 cs 解引用后的任何被打开的通道都不会被关闭。

*方法:* 一种可能的解决方案是始终关闭连接到已关闭源通道的通道。即


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  ;(tap m c))  ;; 总是关闭 c


可以通过向 cs 原子添加一个标志来表示多通道是开启还是关闭。如果是关闭状态,则任何被打开的通道会自动关闭。

7 个回答

0

评论者:jreeves

为了参考,以下是我使用的针对 mult 的自定义修复

`
(defn mult [ch]
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0

评论者:dnolen

这是否也在 master 中修复了?谢谢。

0

评论者:gshayban

我理解了场景,但说实话,我不确定这是mult或使用的bug。一个通道不应该总是期望产生take。消费者可以使用alt或其他机制来保护它,你还可以通过事物“生产”端的策略来强制不产生“late taps”。

(链接:~richhickey)你能发表意见吗?

0

评论者:jreeves

当前“tap”函数有一个显式的“close?”标志,如果tapped通道在源通道关闭时不保证关闭,那么这个参数可能不应该存在。另外,如果移除了自动关闭tap,那么我们是否也应该从“sub”中移除“close?”参数?

0

评论者:gshayban

不只是要尊重标志。与关闭行为相关,通道可以在mult过程快乐地向另一组通道分配一个值时按下并释放而不接收任何东西(例如ABA问题)。还可以将关闭传递到最后被取消引用的通道集合后进行tap视为错误。这与熟悉的永久nil接收不同,但mult已经与简单通道不同。

0
_评论者:stuart.sierra_

我最近在开发依赖于{{mult}}和{{pipeline}}的默认行为来自动关闭下游通道的系统。但是,有时最初的“输入”通道关闭得太快,而通道图仍在被构建。结果,一些输出通道保持打开,一些go-loop继续运行。

我为此进行的修复是在任何处理之前创建tap,但这让我开始思考默认行为应该是怎样的。

我期望的行为是:当调用{{tap}}对一个{{mult}}并在其中使用{{close?}}参数为true(默认值)时,如果mult的输入通道已经关闭,那么传递给{{tap}}的通道会立即关闭。
0
参考:[https://clojure.atlassian.net/browse/ASYNC-64](https://clojure.atlassian.net/browse/ASYNC-64) (由 alex+import 报告)
...