2024 Clojure现状调查中分享您的想法!

欢迎!请查看关于页面以了解更多关于该功能的信息。

0
core.async

我有一个适用于clojure.core.async/pipeline的使用场景,其中输出的顺序不重要,并且希望在没有任何延迟的情况下,第五个输出出现在目标通道上。

我在Clojure方面没有太多经验,我想知道这是否应该属于核心库或者是否有某种惯用的方法来实现无序管道而无需重新编写管道函数。无论如何,我已经尝试重新编写管道,并在过程中学到一些东西 :)

显然,这个代码(下面)无法合并,因为它完全破坏了现有的行为(更不用说几乎没测试过了),但如果您能提供一些反馈或考虑添加此功能,我将不胜感激。也许是一个新的函数,比如{{unordered-pipeline}}?

`
(defn- pipeline*)
([n to xf from close? ex-handler type]

 (assert (pos? n))
 (let [ex-handler (or ex-handler (fn [ex]
                                   (-> (Thread/currentThread)
                                       .getUncaughtExceptionHandler
                                       (.uncaughtException (Thread/currentThread) ex))
                                   nil))
       jobs (chan n)
       results (chan n)
       process (fn [[v p :as job]]
                 (if (nil? job)
                   (comment closing results here would be too early)
                   (let [res (chan 1 xf ex-handler)]
                     (>!! res v)
                     (close! res)
                     (put! p res)
                     true)))
       async (fn [[v p :as job]]
               (if (nil? job)
                 (comment closing results here would be too early)
                 (let [res (chan 1)]
                   (xf v res)
                   (put! p res)
                   true)))]
   (go-loop []
            (let [v (<! from)]
              (if (nil? v)
                (close! jobs)
                (do
                  (comment removed the indirection and pass results as part of the job)
                  (>! jobs [v results])
                  (recur)))))
   (go-loop []
            (let [res (<! results)]
              (if (nil? res)
                (when close? (close! to))
                (do (loop []
                      (let [v (<! res)]
                        (when (and (not (nil? v)) (>! to v))
                          (recur))))
                    (recur)))))
   (go
     (comment ensure results is closed after all jobs have been processed)
     (<! (async/merge
           (map #(case %
                   :blocking (thread
                               (let [job (<!! jobs)]
                                 (when (process job)
                                   (recur))))
                   :compute (go-loop []
                                     (let [job (<! jobs)]
                                       (when (process job)
                                         (recur))))
                   :async (go-loop []
                                   (let [job (<! jobs)]
                                     (when (async job)
                                       (recur)))))
                (repeat n type))))
     (close! results)))))

`

3 个回答

0

评论者:phiware

我仅尝试了{{:compute}}场景...

(snooze [] (Thread/sleep (+ 5 (rand-int 10)))) (defn slowly [f] (fn ([] (snooze) (let [out (f)] (snooze) out)) ([r] (snooze) (let [out (f r)] (snooze) out)) ([r i] (snooze) (let [out (f r i)] (snooze) out)))) (def cout (chan)) (pipeline 3 cout (comp slowly (map inc)) (async/to-chan (range 18))) (<!! (async/into [] cout)) -> [1 3 2 4 5 6 8 9 7 10 12 11 13 14 15 16 17 18]

每组3个(n)的输出以随机顺序输出 :)

0

评论者:lgs32a

我也遇到过这种情况。一个简单的方法是使用一个大小为0的掉落缓冲区的通道作为目标通道,并通过op内部的put!自行填充未排序的目标通道。如果你想消除所有管道开销,很容易生成n个线程循环,它们从同一个通道获取thunk(fn (link: ) (put! unordered-target-chan (heavy-calc...))),该thunk将结果放入你希望的目标通道中。

0
...