2024年Clojure调查!中分享您的想法。

欢迎!请查阅关于页面以获取更多如何使用本页面的信息。

0
core.async

我对clojure.core.async/pipeline有一个用例,其中输出的顺序不重要,并且希望在不需要等待第一个输出准备好之前,在没有表达式的to通道上准备好第n^th^个输出。

我在Clojure方面经验不多;我想知道这是否属于核心库的一部分,或者是否有某种惯用方法可以实现无序管道而无需重写管道函数。无论如何,我已经尝试重写管道,并且在这个过程中学到了一些东西 :)

显然,下面的代码(如下)不能合并,因为它完全破坏了现有的行为(更不用说我只进行了很少的测试),但如果您能提供反馈并考虑添加此功能,我将不胜感激。也许是一个新的函数,{{unordered-pipeline}}?

`
(defn- pipeline*)
([n to xf from close? ex-handler type]

 (assert (pos? n))
 (let [ex-handler (or ex-handler (fn [ex]
                                   (-> (Thread/currentThread)
                                       .getUncaughtExceptionHandler
                                       (.uncaughtException (Thread/currentThread) ex))
                                   nil))
       jobs (chan n)
       results (chan n)
       process (fn [[v p :as job]]
                 (if (nil? job)
                   (comment closing results here would be too early)
                   (let [res (chan 1 xf ex-handler)]
                     (>!! res v)
                     (close! res)
                     (put! p res)
                     true)))
       async (fn [[v p :as job]]
               (if (nil? job)
                 (comment closing results here would be too early)
                 (let [res (chan 1)]
                   (xf v res)
                   (put! p res)
                   true)))]
   (go-loop []
            (let [v (<! from)]
              (if (nil? v)
                (close! jobs)
                (do
                  (comment removed the indirection and pass results as part of the job)
                  (>! jobs [v results])
                  (recur)))))
   (go-loop []
            (let [res (<! results)]
              (if (nil? res)
                (when close? (close! to))
                (do (loop []
                      (let [v (<! res)]
                        (when (and (not (nil? v)) (>! to v))
                          (recur))))
                    (recur)))))
   (go
     (comment ensure results is closed after all jobs have been processed)
     (<! (async/merge
           (map #(case %
                   :blocking (thread
                               (let [job (<!! jobs)]
                                 (when (process job)
                                   (recur))))
                   :compute (go-loop []
                                     (let [job (<! jobs)]
                                       (when (process job)
                                         (recur))))
                   :async (go-loop []
                                   (let [job (<! jobs)]
                                     (when (async job)
                                       (recur)))))
                (repeat n type))))
     (close! results)))))

`

3 个回答

0

评论者:phiware

我只尝试了{{:compute}}场景...

(snooze [] (Thread/sleep (+ 5 (rand-int 10)))) (defn slowly [f] (fn ([] (snooze) (let [out (f)] (snooze) out)) ([r] (snooze) (let [out (f r)] (snooze) out)) ([r i] (snooze) (let [out (f r i)] (snooze) out)))) (def cout (chan)) (pipeline 3 cout (comp slowly (map inc)) (async/to-chan (range 18))) (<!! (async/into [] cout)) -> [1 3 2 4 5 6 8 9 7 10 12 11 13 14 15 16 17 18]

每组3个(n)以随机顺序输出 :)

0

评论者:lgs32a

我也遇到过这种情况。一个简单的方法是使用具有大小为0的下滴缓冲区的通道作为目标通道,并通过 put! 自身在操作中填充无序目标通道。如果您想要消除所有管道开销,可以轻松地启动 n 线程循环,它们从相同的通道获取 thunk(fn (link: ) (put! unordered-target-chan (heavy-calc...))),将结果放入您希望的目标通道。

0
...