2024 Clojure状态调查!中分享您的想法。

欢迎!有关如何使用本站的更多信息,请参阅关于页面。

0
core.async

我有clojure.core.async/pipeline的一个用例,其中输出的顺序不重要,并希望在to通道上获得第n个输出而无需等待第一个输出就绪。

我并没有太多关于Clojure的经验;我想知道这是否应该属于核心库,或者是否有某种惯用方式可以无序创建流水线而不重写流水线函数。无论如何,我已经尝试修改流水线,并且在过程中学到了一二。

显然,下面的代码(如下所示)不能合并,因为它完全破坏了现有的行为(更不用说它几乎没有经过测试),但如果您能提供反馈或考虑添加此功能,我将不胜感激。或许可以有一个新的函数,{{unordered-pipeline}}?

`
(defn- pipeline*)
([n to xf from close? ex-handler type]

 (assert (pos? n))
 (let [ex-handler (or ex-handler (fn [ex]
                                   (-> (Thread/currentThread)
                                       .getUncaughtExceptionHandler
                                       (.uncaughtException (Thread/currentThread) ex))
                                   nil))
       jobs (chan n)
       results (chan n)
       process (fn [[v p :as job]]
                 (if (nil? job)
                   (comment closing results here would be too early)
                   (let [res (chan 1 xf ex-handler)]
                     (>!! res v)
                     (close! res)
                     (put! p res)
                     true)))
       async (fn [[v p :as job]]
               (if (nil? job)
                 (comment closing results here would be too early)
                 (let [res (chan 1)]
                   (xf v res)
                   (put! p res)
                   true)))]
   (go-loop []
            (let [v (<! from)]
              (if (nil? v)
                (close! jobs)
                (do
                  (comment removed the indirection and pass results as part of the job)
                  (>! jobs [v results])
                  (recur)))))
   (go-loop []
            (let [res (<! results)]
              (if (nil? res)
                (when close? (close! to))
                (do (loop []
                      (let [v (<! res)]
                        (when (and (not (nil? v)) (>! to v))
                          (recur))))
                    (recur)))))
   (go
     (comment ensure results is closed after all jobs have been processed)
     (<! (async/merge
           (map #(case %
                   :blocking (thread
                               (let [job (<!! jobs)]
                                 (when (process job)
                                   (recur))))
                   :compute (go-loop []
                                     (let [job (<! jobs)]
                                       (when (process job)
                                         (recur))))
                   :async (go-loop []
                                   (let [job (<! jobs)]
                                     (when (async job)
                                       (recur)))))
                (repeat n type))))
     (close! results)))))

`

3 个答案

0

评论者:phiware

我只试过{{:compute}}场景...

(snooze [] (Thread/sleep (+ 5 (rand-int 10)))) (defn slowly [f] (fn ([] (snooze) (let [out (f)] (snooze) out)) ([r] (snooze) (let [out (f r)] (snooze) out)) ([r i] (snooze) (let [out (f r i)] (snooze) out)))) (def cout (chan)) (pipeline 3 cout (comp slowly (map inc)) (async/to-chan (range 18))) (<!! (async/into [] cout)) -> [1 3 2 4 5 6 8 9 7 10 12 11 13 14 15 16 17 18]

每组3个(n)数据以随机顺序输出 :)

0

评论者:lgs32a

我也遇到过这种情况。一种简单的方法是使用一个具有大小为0的掉落缓冲区的通道作为目标通道,并通过put!自己从op内部填充无序目标通道。如果你希望消除所有管道开销,可以非常容易地启动n个线程循环,这些循环从相同的通道中获取thunk(fn (link:) (put! unordered-target-chan (heavy-calc...))),在您的期望目标通道上放置结果。

0
by
参考: https://clojure.atlassian.net/browse/ASYNC-150 (由phiware报告)
...