这是用Java编写的Kafka Streams应用程序的样子
sb.table("input", Consumed.with(sl, sl))
.groupBy((k, v) -> KeyValue.pair(k / 10, v), Grouped.with(sl, sl))
.aggregate(() -> 0L,
(k, v, acc) -> acc + v,
(k, v, acc) -> acc - v,
Materialized.with(sl, sl))
.toStream()
.to("output", Produced.with(sl, sl));
同样用Clojure编写的应用程序是这样的
(-> sb
(.table "input" (topic->consumed data-in))
(.groupBy (key-value-mapper
(fn [k v] (KeyValue/pair (long (/ k 10)) v)))
(serdes->grouped "groupie" data-in))
(.aggregate (reify Initializer
(apply [_] 0))
(reify Aggregator
(apply [_ k v acc]
(+ acc v)))
(reify Aggregator
(apply [_ k v acc]
(- acc v)))
(serdes->materialised ...))
(.toStream)
(.to "output" (topic->produced data-out)))
如果我们有能力在期望SAM类型的Lambda中,我们可以这样编写
(-> sb
(.table "input" (topic->consumed data-in))
(.groupBy (fn [k v] (KeyValue/pair (long (/ k 10)) v))
(serdes->grouped "groupie" data-in))
(.aggregate (constantly 0)
(fn [k v acc] (+ acc v))
(fn [k v acc] (- acc v))
(serdes->materialised ...))
(.toStream)
(.to "output" (topic->produced data-out)))