2024年Clojure状态调查中分享您的想法!

欢迎!请查看关于页面了解有关如何使用本网站的一些更多信息。

0
core.async

我有一个clojure.core.async/pipeline的使用场景,其中输出的顺序不重要,并且希望在第n个输出在to通道中出现,而无需等待第一个输出准备好。

我在Clojure方面没有太多经验;我想知道这是否属于核心库的东西,或者是否有某种惯用的方式来实现无序管道而无需重写pipeline函数。无论如何,我尝试重写了pipeline,并在过程中学到了一些东西 :)

显然,下面的代码无法合并,因为它完全破坏了现有的行为(更不用说我只做了一点点测试了),但如果您能提供反馈,或者考虑添加此功能,我将不胜感激。也许是一个新的函数,比如{{unordered-pipeline}}?

`
(defn- pipeline*
([n to xf from close? ex-handler type]

 (assert (pos? n))
 (let [ex-handler (or ex-handler (fn [ex]
                                   (-> (Thread/currentThread)
                                       .getUncaughtExceptionHandler
                                       (.uncaughtException (Thread/currentThread) ex))
                                   nil))
       jobs (chan n)
       results (chan n)
       process (fn [[v p :as job]]
                 (if (nil? job)
                   (comment closing results here would be too early)
                   (let [res (chan 1 xf ex-handler)]
                     (>!! res v)
                     (close! res)
                     (put! p res)
                     true)))
       async (fn [[v p :as job]]
               (if (nil? job)
                 (comment closing results here would be too early)
                 (let [res (chan 1)]
                   (xf v res)
                   (put! p res)
                   true)))]
   (go-loop []
            (let [v (<! from)]
              (if (nil? v)
                (close! jobs)
                (do
                  (comment removed the indirection and pass results as part of the job)
                  (>! jobs [v results])
                  (recur)))))
   (go-loop []
            (let [res (<! results)]
              (if (nil? res)
                (when close? (close! to))
                (do (loop []
                      (let [v (<! res)]
                        (when (and (not (nil? v)) (>! to v))
                          (recur))))
                    (recur)))))
   (go
     (comment ensure results is closed after all jobs have been processed)
     (<! (async/merge
           (map #(case %
                   :blocking (thread
                               (let [job (<!! jobs)]
                                 (when (process job)
                                   (recur))))
                   :compute (go-loop []
                                     (let [job (<! jobs)]
                                       (when (process job)
                                         (recur))))
                   :async (go-loop []
                                   (let [job (<! jobs)]
                                     (when (async job)
                                       (recur)))))
                (repeat n type))))
     (close! results)))))

`

3 个答案

0

评论者:phiware

我只尝试了{{:compute}}场景...

(snooze [] (Thread/sleep (+ 5 (rand-int 10)))) (defn slowly [f] (fn ([] (snooze) (let [out (f)] (snooze) out)) ([r] (snooze) (let [out (f r)] (snooze) out)) ([r i] (snooze) (let [out (f r i)] (snooze) out)))) (def cout (chan)) (pipeline 3 cout (comp slowly (map inc)) (async/to-chan (range 18))) (<!! (async/into [] cout)) -> [1 3 2 4 5 6 8 9 7 10 12 11 13 14 15 16 17 18]

每组3个(即n)以随机顺序输出 :)

0

评论者:lgs32a

我也遇到过这种情况。完成它的简单方法是用一个带有大小为0的丢弃缓冲区的通道作为目标通道,并通过使用put! yourself从op内部填充无序的目标通道。如果您想消除所有管道开销,很简单,可以生成n个线程循环,从相同的通道读取thunk(fn (link: ) (put! unordered-target-chan (heavy-calc...))),将结果放入您希望的目标通道。

0
参考: https://clojure.atlassian.net/browse/ASYNC-150 (phiware提供信息)
...