2024 Clojure 状态调查!中分享您的想法。

欢迎!请参阅关于页面以获取更多关于工作方式的信息。

+4
转换器

在 Rich Hickey 的 talk about Transducers (2014) 中,他谈到存在的“转换器上下文”“今天”—— 集合和 core.async,以及可能“明天”的“可观察对象”。

现在过去五年了,我想知道,还有哪些在现实世界中创建并使用的其他转换器上下文?

4 个答案

+4

xforms 库中有几个。

+1

我认为当您需要时,您将创建自己的上下文 - 例如,我们处理成千上万来自 WebSocket 的事件,并将它们写入 Kafka 主题,其中有不同的下游消费者多次处理它们。转换器对这些操作非常合适,并且我们广泛使用它们。

0 votes
by

我对这个想法把玩了一段时间,并想分享一下。基本方法是在保持记录结果是否为 reduced? 的同时,向一个转置原子中添加东西。有点像你给 core.async/chan 提供一个转置器的样子。

(defn mkadder
  "returns a function which, given a value, adds that value the the atom a with the reducing function rf. If the result is reduced, deliver the dereferenced result to promise p."
  [a p rf]
  (fn [x]
    (let [result (rf a x)]
      (if (reduced? result)
        (do (deliver p @@result) @result) ;;what to return?
        result))))

(defn acc
  "accumulates state in an atom subject to a transducer. returns a map
  with the keys :adder, :a and :p. Use the :add! function to add
  state. :p is a promise which will be delivered the state in a when
  rf is realized"
  ([xf rf] (acc xf rf (rf)))
  ([xf rf init]
   (let [a       (atom init)
         swapper (fn [acc x] (doto acc (swap! rf x)))
         rf      (xf swapper)
         p       (promise)]
     {:add! (mkadder a p rf) :a a :p p})))

;;demo
(let [{:keys [add! a p]}
      (acc (comp (map inc) (filter even?) (take 5)) conj)]
  (future
    (doseq [i (range 20)]
      (add! i)
      (println "atom contains: " @a "promise realized?" (realized? p))
      
      (Thread/sleep 500))))

我有一个场景,来自外部来源有固定数量的传入消息需要经过 filtertake/take-while 处理。能够在不需要自己管理状态的情况下利用这些 clojure.core 功能会很好,而.core.async 有点过于强大了(我不需要 CSP)。

...