请分享您的看法,参与 2024 Clojure 状态调查!

欢迎!请查阅 关于 页面以了解更多信息。

+1
core.async
当在源通道关闭的同时对多通道进行点击时,被点击的通道可能不会被关闭。


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))


上面的代码有时返回 true,有时返回 false。

*原因:* 这是由以下 {{mult}} 函数中的代码引起的


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


在 cs 进行解引用后点击的任何通道都不会被关闭。

*方法:* 一种可能的解决方法是始终关闭连接到已关闭源的多通道。即


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; 总是关闭 c


这可以通过在 cs 原子上添加一个标志来实现,表示多通道是打开还是关闭。如果是关闭的,任何被点击的通道都会自动关闭。

7 答案

0

评论者:jreeves

为了参考,以下是我使用的自定义的多通道修复函数

`
(defn mult [ch]
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0

评论者:dnolen

这也已在主分支中修复吗?谢谢。

0

评论者:gshayban

我理解这个场景,但说实话,我不确定这是mult的问题还是使用上的问题。一个通道不应该预期总是产生take。消费者可以使用alts或其他机制来抵御它,你也可以通过“生产”端的策略来实施无 late-taps。

(链接:~richhickey) 你能提出意见吗?

0

评论者:jreeves

"tap"函数目前有一个显式的"close?"标志,如果一个tapped通道在源通道关闭时不保证关闭,那么这个参数可能不应存在。另外,如果我们取出了自动关闭taps,那么是否也应该在"sub"上移除"close?"参数?

0

评论者:gshayban

不仅仅是尊重标志。与关闭行为相关,通道可以在接收到任何内容之前进行tap和untap操作,而mult进程却在愉快地将一个值分配给另一组通道(如ABA问题)。也可以将关闭操作分配到已分配给最后一级通道的次数上作为错误。这不同于熟悉的永久nil接收,但mult已经与简单通道有所不同。

0
_评论者:stuart.sierra_

我最近在开发一个系统,该系统依赖于{{mult}}和{{pipeline}}的默认行为来自动关闭下游通道。但有时初始的"输入"通道关闭得很快,而通道图仍在构造中。结果,一些输出通道被留下了,一些go-loop仍在运行。

我在这个例子中的修复是提前创建taps,在处理之前,这让我开始思考默认行为应该是什么。

我期望的行为是当在一个{{mult}}上调用{{tap}}并设置{{close?}}参数为true(默认值),并且mult的输入通道已经关闭时,传递给{{tap}}的通道立即关闭。
0
参考:https://clojure.atlassian.net/browse/ASYNC-64(由 alex+import 报告)
...