我对clojure.core.async/pipeline有一个用例,其中输出的顺序不重要,并且希望在不需要等待第一个输出准备好之前,在没有表达式的to通道上准备好第n^th^个输出。
我在Clojure方面经验不多;我想知道这是否属于核心库的一部分,或者是否有某种惯用方法可以实现无序管道而无需重写管道函数。无论如何,我已经尝试重写管道,并且在这个过程中学到了一些东西 :)
显然,下面的代码(如下)不能合并,因为它完全破坏了现有的行为(更不用说我只进行了很少的测试),但如果您能提供反馈并考虑添加此功能,我将不胜感激。也许是一个新的函数,{{unordered-pipeline}}?
`
(defn- pipeline*)
([n to xf from close? ex-handler type]
(assert (pos? n))
(let [ex-handler (or ex-handler (fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil))
jobs (chan n)
results (chan n)
process (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1 xf ex-handler)]
(>!! res v)
(close! res)
(put! p res)
true)))
async (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1)]
(xf v res)
(put! p res)
true)))]
(go-loop []
(let [v (<! from)]
(if (nil? v)
(close! jobs)
(do
(comment removed the indirection and pass results as part of the job)
(>! jobs [v results])
(recur)))))
(go-loop []
(let [res (<! results)]
(if (nil? res)
(when close? (close! to))
(do (loop []
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
(recur)))))
(go
(comment ensure results is closed after all jobs have been processed)
(<! (async/merge
(map #(case %
:blocking (thread
(let [job (<!! jobs)]
(when (process job)
(recur))))
:compute (go-loop []
(let [job (<! jobs)]
(when (process job)
(recur))))
:async (go-loop []
(let [job (<! jobs)]
(when (async job)
(recur)))))
(repeat n type))))
(close! results)))))
`