_评论由:jwhitlark_发布
;; 我从实验kafka streams的一些临时代码中整理了这个。原始代码中补全了所有的reify的lambda表达式。
;; 我会再找出一些使用java.utils.funstion.*中内容的示例。
;; 这些中的一些是从一些franzy示例中复制过来的吗?
;; 注意,例如,
;;
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html;; 与
;;
https://docs.oracle.com/javase/8/docs/api/java/util/function/Predicate.html
(ns utils
(:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))
(defmacro reducer [kv & body]
`(reify Reducer
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
`(reify KeyValueMapper
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
`(reify ValueMapper
(apply [_# ~v]
~@body)))
(defmacro pred [kv & body]
`(reify Predicate
(test [_# ~(first kv) ~(second kv)]
~@body)))
;; 我就是这样使用的
(ns our-service.kafka-streams
(:require
[our-service.util :as k]
[clojure.string :as str]
(:import
(org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
(org.apache.kafka.streams.kstream KStreamBuilder ValueMapper)))
(defn create-word-count-topology []
(let [builder (KStreamBuilder.)
init-stream (.stream builder (into-array ["streams-str-input"]))
wc (-> init-stream
(.flatMapValues (k/v-mapper [& value]
(str/split (apply str value) #"\s")))
(.map (k/kv-mapper [k v]
(KeyValue/pair v v)))
(.filter (k/pred [k v]
(println v)
(not= v "the")))
(.groupByKey)
(.count "CountStore")
show-item
;; 这需要使用 mapValues
(.mapValues (reify ValueMapper
(apply [_ v]
(println v)
(str v))))
(.toStream)
(.to "wordcount-output"))]
【构建 wc】)