Clojure 2024 年度调查问卷 中分享您的想法!

欢迎!请参阅 关于 页面以了解更多关于此功能的信息。

+1投票
core.async
当一个多通道在关闭源通道的同一时间被选中时,选中的通道可能不会被关闭。


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))


上述代码有时会返回 true,有时会返回 false。

*原因:* 这是由于以下 {{mult}} 函数中的代码造成的


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


cs 解引用之后任何被选中的通道都不会被关闭。

*方法:* 解决这个问题的可能方法是在 cs 原子上添加一个标志来表示多通道是打开还是关闭。如果它是关闭的,任何被选中的通道将自动关闭。


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; 总是关闭 c


这可以通过给 cs 原子添加一个标志来实现,以表示多通道是开启还是关闭。如果关闭,任何被选中的通道将自动关闭。

7 答案

0投票

评论由:jreeves 发布

为了参考,以下是我自定义的 mult 修复代码

`
(defn mult [ch)
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0投票

评论由:dnolen 发布

这个在 master 中也已经修复了吗?谢谢。

0投票

评论者:gshayban

我理解了场景,但老实说,我不确定这是mult的bug还是使用问题。不应该期望信道总是产生take。消费者可以通过alts或其他机制来防护它,还可以通过在生产方面的策略上强制执行no-late-taps。

(链接:~richhickey)你能在这种情况下提出意见吗?

0投票

评论由:jreeves 发布

当前的"tap"函数有一个显式的"close?"标志,如果一个tapped信道在被源信道关闭时不保证关闭,那么这个参数可能不应该存在。另外,如果我们从自动关闭taps中移除,那么是否应该也从"sub"中移除"close?"参数?

0投票

评论者:gshayban

这不仅仅是尊重标志。与关闭行为相关,信道可以在没有任何接收的情况下去tap和untap,而mult过程则高兴地向另一组信道(如ABA问题)分布值。也可以将其变成一个错误,如果关闭状态被分发给最后的deref'ed信道组之后,再进行tap。这与熟悉的永久nil接收不同,但mult已经与简单的信道不同。

0投票
评论者:stuart.sierra

我最近正在做的一个系统依赖于{{mult}}和{{pipeline}}的默认行为来自动关闭下游信道。但有时最初的"输入"信道被迅速关闭,而信道图仍在构造中。结果,一些输出信道保持开启,一些go-loops继续运行。

我情况下的修复是在任何处理之前尽早创建taps,但这让我思考默认行为应该是什么。

我预期的行为是,当在带有{{close?}}参数为true(默认)的{{mult}}上调用{{tap}}时,如果mult的输入信道已经关闭,那么传递给{{tap}}的信道将被立即关闭。
0投票
参考:https://clojure.atlassian.net/browse/ASYNC-64(由 alex+import 报告)
...