;; 从一些针对kafka streams的实验代码中找到的。原始的reify全部填满了java8 lambda。
;; 我会找到另一个显示使用java.utils.funstion.*中内容的例子。
;; 其中一些是从franzy的示例或类似的地方借鉴的?
;; 注意,例如
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html;; 不同于
(ns utils
(:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))
(defmacro reducer [kv & body]
`(reify Reducer
(apply [_# ~(first kv) ~(second kv)]
;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
`(reify KeyValueMapper
(apply [_# ~(first kv) ~(second kv)]
;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
`(reify ValueMapper
(apply [_# ~v]
(defmacro pred [kv & body]
`(reify Predicate
(test [_# ~(first kv) ~(second kv)]
;; 我这样使用它
(ns our-service.kafka-streams
[our-service.util :as k]
[clojure.string :as str]
(org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
(org.apache.kafka.streams.kstream KStreamBuilder ValueMapper))
(defn create-word-count-topology []
(let [builder (KStreamBuilder.)
init-stream (.stream builder (into-array ["streams-str-input"]))
wc (-> init-stream
(.flatMapValues (k/v-mapper [& value]
(str/split (apply str value) #"\s")))
(.map (k/kv-mapper [k v]
(KeyValue/pair v v)))
(.filter (k/pred [k v]
(println v))
(not= v "the")))
(.count "CountStore")
;; this needs to be mapValues
(.mapValues (reify ValueMapper
(apply [_ v]
(println v))
(str v)))
(.to "wordcount-output"))]
[builder wc])