我有clojure.core.async/pipeline的一个用例,其中输出的顺序不重要,并希望在to通道上获得第n个输出而无需等待第一个输出就绪。
我并没有太多关于Clojure的经验;我想知道这是否应该属于核心库,或者是否有某种惯用方式可以无序创建流水线而不重写流水线函数。无论如何,我已经尝试修改流水线,并且在过程中学到了一二。
显然,下面的代码(如下所示)不能合并,因为它完全破坏了现有的行为(更不用说它几乎没有经过测试),但如果您能提供反馈或考虑添加此功能,我将不胜感激。或许可以有一个新的函数,{{unordered-pipeline}}?
`
(defn- pipeline*)
([n to xf from close? ex-handler type]
(assert (pos? n))
(let [ex-handler (or ex-handler (fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil))
jobs (chan n)
results (chan n)
process (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1 xf ex-handler)]
(>!! res v)
(close! res)
(put! p res)
true)))
async (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1)]
(xf v res)
(put! p res)
true)))]
(go-loop []
(let [v (<! from)]
(if (nil? v)
(close! jobs)
(do
(comment removed the indirection and pass results as part of the job)
(>! jobs [v results])
(recur)))))
(go-loop []
(let [res (<! results)]
(if (nil? res)
(when close? (close! to))
(do (loop []
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
(recur)))))
(go
(comment ensure results is closed after all jobs have been processed)
(<! (async/merge
(map #(case %
:blocking (thread
(let [job (<!! jobs)]
(when (process job)
(recur))))
:compute (go-loop []
(let [job (<! jobs)]
(when (process job)
(recur))))
:async (go-loop []
(let [job (<! jobs)]
(when (async job)
(recur)))))
(repeat n type))))
(close! results)))))
`