我有一个clojure.core.async/pipeline的使用场景,其中输出的顺序不重要,并且希望在第n个输出在to通道中出现,而无需等待第一个输出准备好。
我在Clojure方面没有太多经验;我想知道这是否属于核心库的东西,或者是否有某种惯用的方式来实现无序管道而无需重写pipeline函数。无论如何,我尝试重写了pipeline,并在过程中学到了一些东西 :)
显然,下面的代码无法合并,因为它完全破坏了现有的行为(更不用说我只做了一点点测试了),但如果您能提供反馈,或者考虑添加此功能,我将不胜感激。也许是一个新的函数,比如{{unordered-pipeline}}?
`
(defn- pipeline*
([n to xf from close? ex-handler type]
(assert (pos? n))
(let [ex-handler (or ex-handler (fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil))
jobs (chan n)
results (chan n)
process (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1 xf ex-handler)]
(>!! res v)
(close! res)
(put! p res)
true)))
async (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1)]
(xf v res)
(put! p res)
true)))]
(go-loop []
(let [v (<! from)]
(if (nil? v)
(close! jobs)
(do
(comment removed the indirection and pass results as part of the job)
(>! jobs [v results])
(recur)))))
(go-loop []
(let [res (<! results)]
(if (nil? res)
(when close? (close! to))
(do (loop []
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
(recur)))))
(go
(comment ensure results is closed after all jobs have been processed)
(<! (async/merge
(map #(case %
:blocking (thread
(let [job (<!! jobs)]
(when (process job)
(recur))))
:compute (go-loop []
(let [job (<! jobs)]
(when (process job)
(recur))))
:async (go-loop []
(let [job (<! jobs)]
(when (async job)
(recur)))))
(repeat n type))))
(close! results)))))
`