我有一个适用于clojure.core.async/pipeline的使用场景,其中输出的顺序不重要,并且希望在没有任何延迟的情况下,第五个输出出现在目标通道上。
我在Clojure方面没有太多经验,我想知道这是否应该属于核心库或者是否有某种惯用的方法来实现无序管道而无需重新编写管道函数。无论如何,我已经尝试重新编写管道,并在过程中学到一些东西 :)
显然,这个代码(下面)无法合并,因为它完全破坏了现有的行为(更不用说几乎没测试过了),但如果您能提供一些反馈或考虑添加此功能,我将不胜感激。也许是一个新的函数,比如{{unordered-pipeline}}?
`
(defn- pipeline*)
([n to xf from close? ex-handler type]
(assert (pos? n))
(let [ex-handler (or ex-handler (fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil))
jobs (chan n)
results (chan n)
process (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1 xf ex-handler)]
(>!! res v)
(close! res)
(put! p res)
true)))
async (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1)]
(xf v res)
(put! p res)
true)))]
(go-loop []
(let [v (<! from)]
(if (nil? v)
(close! jobs)
(do
(comment removed the indirection and pass results as part of the job)
(>! jobs [v results])
(recur)))))
(go-loop []
(let [res (<! results)]
(if (nil? res)
(when close? (close! to))
(do (loop []
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
(recur)))))
(go
(comment ensure results is closed after all jobs have been processed)
(<! (async/merge
(map #(case %
:blocking (thread
(let [job (<!! jobs)]
(when (process job)
(recur))))
:compute (go-loop []
(let [job (<! jobs)]
(when (process job)
(recur))))
:async (go-loop []
(let [job (<! jobs)]
(when (async job)
(recur)))))
(repeat n type))))
(close! results)))))
`