我遇到了以下错误
单个通道上不允许超过1024个挂起的take操作
然而,我以为如果我为通道定义了一个特定大小的规格,put将会阻塞。
(def event-chan (chan 10))
我在循环中调用这个函数
(>!! event-chan evt)
我的预期是当通道中有10个未处理的项目时将阻塞,然而这并没有发生。我的代码正在读取数据库中的fifo队列并进行处理。它似乎即使通道中有未处理的项目也不会阻塞。
编辑:已更新为复制粘贴完整的实际代码。
(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
(def event-chan (chan 10))
(defn eat-now-or-later[evt]
(if ( = (evt :code) :loadevt)
(do (println "sync processing load event ")
(process-edn evt))
(>!! event-chan evt)
))
(defn async-deque [shutdown?]
(time! dq-call-processing-time
(let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
(go (while true (process-edn (<! event-chan))))
(loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
(if (or (shutdown?) (nil? evt)) 0
(do
(eat-now-or-later evt)
(recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
)
)))
(ptasks/print-stats))
上面的内容是函数定义。我正在从下面的守护进程代码中调用此函数
(ns dqdaemon.core
(:require [shutdown.core :as shutdown])
(:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
)
;; A crude approximation of your application's state.
(def state (atom {}))
(defn shutdown?[] (false? (:running @state)))
(defn init [args]
(swap! state assoc :running true)
(shutdown/add-hook! ::descriptive-name #(do
(swap! state assoc :running false)
(println "It's about to go down!")))
)
(defn start []
(while (:running @state)
(println "tick")
(dqt/async-deque shutdown?)
(Thread/sleep 2000)))
;; Enable command-line invocation
(defn -main [& args]
(init args)
(start))
更新
我明白这不是阻塞put的问题。问题是因为我的代码正在创建无限循环。
此代码在循环中被调用,而该循环的父方法async-deque则在一个永久循环中被调用。
(go (while true (process-edn (<! event-chan)))))
我将此代码移动到主方法中,以便在守护进程启动时初始化