我正在尝试实现一个简单的带状态的 transducer,用来计数元素数量(使用 ClojureScript)
(defn stateful-counter []
(fn [xf]
(let [counter (atom 0)]
(fn
([] (xf))
([result]
(xf (xf result @counter)))
([result _]
(swap! counter inc)
result)))))
当我在一个序列上运行这个程序时,我得到了输出
(into [] (stateful-counter) (range 5))
[5]
这正好是我的预期。
当我在一个 core.async 通道上运行这个程序时,我得到了一个由 5 组成的无限序列
(go (println
(<! (let [c (async/chan 1 (stateful-counter))]
(async/onto-chan! c (range 5))
(async/into []
(async/take 10 c))))))
[5 5 5 5 5 5 5 5 5 5]
如果我不使用 (async/take 10 _),似乎会陷入无限循环。预期的结果是 [5]。
我还尝试使用 xforms 库中的 net.cgrand.xforms/count,得到了相同的不预期结果
(go (println
(<! (let [c (async/chan 1 xforms/count)]
(async/onto-chan! c (range 5))
(async/into []
(async/take 10 c))))))
我曾假设我在 stateful-counter 的实现中犯了一个错误造成循环。但不理解 xforms/count 产生了同样的(不预期)结果。
这是一个 ClojureScript 片段,但我已经能够用 Clojure 产生相同的不预期结果。
能有人帮我理解为什么在 core.async 通道上应用带状态的 transducer 会导致无限序列的产生吗?
编辑:我也在 Clojure 上看到了相同的不预期行为。