请在2024 年 Clojure 状态调查中分享您的想法!

欢迎!请参阅关于页面以获取更多关于本站如何工作的信息。

+1
core.async
当在源通道关闭附近对多路复用进行“触摸”时,可能不会关闭被“触摸”的通道。


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))


上述代码有时会返回 true,有时会返回 false。

*原因:* 这是由于 {{mult}} 函数中的以下代码引起的


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


在 cs 解引用后“触摸”的任何通道都不会被关闭。

*方法:* 解决此问题的可能方法是为 cs 原子添加一个标志,表示多路复用是打开还是关闭。如果是关闭的,则任何被“触摸”的通道将自动关闭。


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; 总会关闭 c


这可以通过向 cs 原子添加一个标志来实现,以表示多路复用是打开还是关闭。如果是关闭的,则任何被“触摸”的通道都会自动关闭。

7 答案

0

评论者:jreeves

为了参考,以下是我使用的自定义 mult 修复

`
(defn mult [ch]
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0

评论者:dnolen

这是否也已在 master 中修复?谢谢。

0

评论由:gshayban

我理解了这个场景,但说实话我不确定这是mult的问题还是使用方式的问题。一个通道不需要总是能进行take操作。对于“late tap”的消费者来说,可以使用alts或其他机制来防范,你还可以通过在生产侧实施政策来强制执行无late taps。

(链接:~richhickey)您可以提出您的看法吗?

0

评论者:jreeves

当前“tap”函数有一个显式的“close?”标志,如果一个tapped通道在源通道关闭时并不能保证关闭,那么这个参数可能不应该存在。另外,如果取消自动关闭taps,我们应该从“sub”中删除“close?”参数吗?

0

评论由:gshayban

这不仅仅是尊重标志。与关闭行为相关,通道可以在Mult过程愉快地将值分配给其他通道(如ABA问题)时进行tap和untap而没有任何接收。也可能使在将关闭状态分配给最后的dereference操作集之后进行tap成为一种错误。这与熟悉的永久nil接收不同,但mult已经在一些方面与简单通道有所不同。

0
_评论由:stuart.sierra_

我最近正在开发一个依赖于mult和pipeline默认行为自动关闭下游通道的系统。但是有时初始的“输入”通道非常快地关闭,而通道图仍处于构建状态。结果,一些输出通道保持了打开状态,一些go-loop仍然在运行。

在我这个案例中,修复方案是在任何处理之前尽早创建taps,但这让我开始思考默认行为应该是什么。

我所期望的行为是,当在带有close?参数为true(默认)的mult上调用tap时,如果mult的输入通道已经被关闭,则立即关闭传入tap的通道。
0
参考资料:https://clojure.atlassian.net/browse/ASYNC-64(由 alex+import 提交)
...