_评论由: jwhitlark_ 提出
;; 我从一些试验 kafka streams 的杂乱代码中挖出了这个。原始代码中所有 reify 都填充了 Java8 lambda。
;; 我会再找到一个使用 java.utils.funstion.* 中内容示例的例子。
;; 有一部分是从 franz 示例或其他地方搬过来的吗?
;; 注意,例如,
;;
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html;; 与
;;
https://docs.oracle.com/javase/8/docs/api/java/util/function/Predicate.html
(ns utils
(:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))
(defmacro reducer [kv & body]
`(reify Reducer
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
`(reify KeyValueMapper
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
`(reify ValueMapper
(apply [_# ~v]
~@body)))
(defmacro pred [kv & body]
`(reify Predicate
(test [_# ~(first kv) ~(second kv)]
~@body)))
;; 我用它这样用
(ns our-service.kafka-streams
(:require
[our-service.util :as k]
[clojure.string :as str]
(:import
(org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
(org.apache.kafka.streams.kstream KStreamBuilder ValueMapper)))
(defn create-word-count-topology []
(let [builder (KStreamBuilder.)
init-stream (.stream builder (into-array ["streams-str-input"]))
wc (-> init-stream
(.flatMapValues (k/v-mapper [& value]
`(str/split (apply str value) #"\s")))
(.map (k/kv-mapper [k v]
`(KeyValue/pair v v)))
(.filter (k/pred [k v]
( println v )
( not= v "the" )
(.groupByKey)
(.count "CountStore")
show-item
;; 这是一个需要mapValues的例子
(.mapValues (reify ValueMapper
(apply [_ v]
(println v)
(str v))))
(.toStream)
(.to "wordcount-output")
[建造者 wc]))