我遇到了以下错误
单通道中不允许超过1024个挂起的获取操作
然而,我以为如果我定义了一个具有指定大小的通道,则放置操作会阻塞。
(def event-chan (chan 10))
我正在循环中调用此操作
(>!! event-chan evt)
我的预期是如果通道中有10个挂起项,则会阻塞。然而,似乎并没有发生。我的代码正在读取数据库中的fifo队列并进行处理。即使通道中有挂起项,似乎也不会阻塞。
编辑:已更新为复制粘贴实际的完整代码。
(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
(def event-chan (chan 10))
(defn eat-now-or-later[evt]
(if ( = (evt :code) :loadevt)
(do (println "sync processing load event ")
(process-edn evt))
(>!! event-chan evt)
))
(defn async-deque [shutdown?]
(time! dq-call-processing-time
(let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
(go (while true (process-edn (<! event-chan))))
(loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
(if (or (shutdown?) (nil? evt)) 0
(do
(eat-now-or-later evt)
(recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
)
)))
(ptasks/print-stats))
上述是函数定义。我从下面的守护进程代码中调用此函数
(ns dqdaemon.core
(:require [shutdown.core :as shutdown])
(:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
)
;; A crude approximation of your application's state.
(def state (atom {}))
(defn shutdown?[] (false? (:running @state)))
(defn init [args]
(swap! state assoc :running true)
(shutdown/add-hook! ::descriptive-name #(do
(swap! state assoc :running false)
(println "It's about to go down!")))
)
(defn start []
(while (:running @state)
(println "tick")
(dqt/async-deque shutdown?)
(Thread/sleep 2000)))
;; Enable command-line invocation
(defn -main [& args]
(init args)
(start))
更新
我明白这不是阻塞放置的问题。是因为我的代码创建了无限的多重循环。
此代码在循环中被调用,并且是其父方法async-dequ
在永久循环中被调用。
(go (while true (process-edn (<! event-chan)))))
我将此代码从loop-recur
中移出,放入主方法中以在守护进程启动时初始化