_评论由:jwhitlark_ 发布:
;;我从实验kafka streams的一些草稿代码中挖出了这个代码。
;;我将再找一个例子来展示使用java.utils.funstion.*的代码示例。
;;这些部分是否是从某个franzy示例中借用过来的?
;;注意,例如,
;;
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html;; 与
;;
https://docs.oracle.com/javase/8/docs/api/java/util/function/Predicate.html
(ns utils
(:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))
(defmacro reducer [kv & body]
`(reify Reducer
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
`(reify KeyValueMapper
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
`(reify ValueMapper
(apply [_# ~v]
~@body)))
(defmacro pred [kv & body]
`(reify Predicate
(test [_# ~(first kv) ~(second kv)]
~@body)))
;; 我用它的大概是这样
(ns our-service.kafka-streams
(:require
[our-service.util :as k]
[clojure.string :as str]
(:import
(org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
(org.apache.kafka.streams.kstream KStreamBuilder ValueMapper)))
(defn create-word-count-topology []
(let [builder (KStreamBuilder.)
init-stream (.stream builder (into-array ["streams-str-input"]))
wc (-> init-stream
(.flatMapValues (k/v-mapper [& value]
(str/split (apply str value) #"\s")))
(.map (k/kv-mapper [k v]
键值对(KeyValue/pair v v)
使用过滤功能(.filter (k/pred [k v])
(println v)
(not= v "the")
使用分组键(.groupByKey)
计数并存储(.count "CountStore")
显示项目
这需要mapValues映射
(使用(reify ValueMapper
(apply [_ v])
(println v)
(str v)
转换为流(.toStream)
输出到文件(.to "wordcount-output")
[构建器 wc])