我对这个想法有点玩心,想和大家分享一下。本质上是一种向原子添加内容的方法,同时保持对结果是否 reduced?
的跟踪。有点像你可以给 core.async/chan
提供一个转换器
(defn mkadder
"returns a function which, given a value, adds that value the the atom a with the reducing function rf. If the result is reduced, deliver the dereferenced result to promise p."
[a p rf]
(fn [x]
(let [result (rf a x)]
(if (reduced? result)
(do (deliver p @@result) @result) ;;what to return?
result))))
(defn acc
"accumulates state in an atom subject to a transducer. returns a map
with the keys :adder, :a and :p. Use the :add! function to add
state. :p is a promise which will be delivered the state in a when
rf is realized"
([xf rf] (acc xf rf (rf)))
([xf rf init]
(let [a (atom init)
swapper (fn [acc x] (doto acc (swap! rf x)))
rf (xf swapper)
p (promise)]
{:add! (mkadder a p rf) :a a :p p})))
;;demo
(let [{:keys [add! a p]}
(acc (comp (map inc) (filter even?) (take 5)) conj)]
(future
(doseq [i (range 20)]
(add! i)
(println "atom contains: " @a "promise realized?" (realized? p))
(Thread/sleep 500))))
我有一个情况,我从一个外部来源收到一定数量的消息,需要将它们提交给 filter
和 take
/take-while
,如果能够利用 clojure.core 的这些功能,而不是自己管理状态,那就太好了。而且 core.async 可能有点过于残忍(我不需要 CSP)。