2024 Clojure状态调查中分享你的想法!

欢迎!请查看关于页面获取更多关于如何使用本站的信息。

+1
core.async
当在接近源通道关闭的同时点击mult时,可能无法正确关闭被点击的通道。


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))


上述代码有时返回true,有时返回false。

*原因:* 这是由以下mult函数中的代码引起的


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


在cs被解引用之后点击的任何通道将不会被关闭。

*方法:* 解决这个问题的可能方法是始终关闭点到已关闭源通道的通道。例如。


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; 总是关闭c


可以通过向cs原子添加一个标志来实现在mult打开或关闭时的功能。如果它关闭了,任何点到已关闭源的通道将被自动关闭。

7 个回答

0

由:jreeves

为了参考,以下是我使用的mult的自定义修复

`
(defn mult [ch]
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0

由:dnolen

这也已在master版中修复了吗?谢谢。

0
答:

评论者:gshayban

我理解了场景,但老实说,我不确定这是mult或使用中的错误。不应该期望通道总是产生take。消费者可以通过alts或其他机制来防护“延迟tap”,你也可以通过“生产”方面的策略来强制不允许迟到的tap。

(链接:~richhickey)你能发表意见吗?

0
答:

由:jreeves

当前的“tap”函数有一个显式的“close?”标志,如果tapped通道不是在源通道关闭时总是保证关闭,那么该参数很可能不应该存在。此外,如果移除了自动关闭tap,那么在“sub”上也应该移除“close?”参数吗?

0
答:

评论者:gshayban

这不仅仅涉及到尊重标志。与关闭行为相关,通道在mult过程愉快地将值分配给其他通道集合时(例如ABA问题)可以取得和取消取得而不接收任何东西。这也可以在将关闭分配给最后退回的通道集合之后将取得变为错误。这与熟悉的永久nil receive不同,但mult已经不同于简单的通道。

0
答:
_评论者:stuart.sierra_

我最近在开发一个系统,该系统依赖于{{mult}}和{{pipeline}}的默认行为来自动关闭下游通道。但是,有时初始“输入”通道关闭得很快,而通道图仍在构建中。因此,一些输出通道保持打开,一些go-loop仍在运行。

在我遇到的修复方案中,我是在任何处理之前更早地创建了taps,但它让我思考默认行为应该是什么。

我期望的行为是,当在设置了{{close?}}参数为true(默认值)的{{mult}}上调用{{tap}}时,如果mult的输入通道已经关闭,那么传递给{{tap}}的通道会立即关闭。
0
参考:https://clojure.atlassian.net/browse/ASYNC-64(由alex+import报告)
...