这是使用Java编写的Kafka Streams应用的样子
sb.table("input", Consumed.with(sl, sl))
.groupBy((k, v) -> KeyValue.pair(k / 10, v), Grouped.with(sl, sl))
.aggregate(() -> 0L,
(k, v, acc) -> acc + v,
(k, v, acc) -> acc - v,
Materialized.with(sl, sl))
.toStream()
.to("output", Produced.with(sl, sl));
同样,使用Clojure编写的应用程序是这样的
(-> sb
(.table "input" (topic->consumed data-in))
(.groupBy (key-value-mapper
(fn [k v] (KeyValue/pair (long (/ k 10)) v)))
(serdes->grouped "groupie" data-in))
(.aggregate (reify Initializer
(apply [_] 0))
(reify Aggregator
(apply [_ k v acc]
(+ acc v)))
(reify Aggregator
(apply [_ k v acc]
(- acc v)))
(serdes->materialised ...))
(.toStream)
(.to "output" (topic->produced data-out)))
如果我们能使用Lambdas并且期望SAM类型的位置,我们就可以这样写
(-> sb
(.table "input" (topic->consumed data-in))
(.groupBy (fn [k v] (KeyValue/pair (long (/ k 10)) v))
(serdes->grouped "groupie" data-in))
(.aggregate (constantly 0)
(fn [k v acc] (+ acc v))
(fn [k v acc] (- acc v))
(serdes->materialised ...))
(.toStream)
(.to "output" (topic->produced data-out)))