我遇到了以下错误
单个通道上允许的等待取出操作不得超过1024个
然而,我认为如果定义了一个指定大小的通道,put操作将会阻塞。
(def event-chan (chan 10))
我正在循环调用此操作
(>!! event-chan evt)
我的预期是如果有10个项目正在通道中等待,它将会阻塞。然而,这好像没有发生。我的代码正在读取数据库中的FIFO队列并进行处理。即使项目正在通道中等待,它似乎也不会阻塞。
编辑:已更新为复制粘贴完整的实际代码。
(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
(def event-chan (chan 10))
(defn eat-now-or-later[evt]
(if ( = (evt :code) :loadevt)
(do (println "sync processing load event ")
(process-edn evt))
(>!! event-chan evt)
))
(defn async-deque [shutdown?]
(time! dq-call-processing-time
(let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
(go (while true (process-edn (<! event-chan))))
(loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
(if (or (shutdown?) (nil? evt)) 0
(do
(eat-now-or-later evt)
(recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
)
)))
(ptasks/print-stats))
上面的内容是函数定义。我从下面的守护进程代码中调用此函数
(ns dqdaemon.core
(:require [shutdown.core :as shutdown])
(:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
)
;; A crude approximation of your application's state.
(def state (atom {}))
(defn shutdown?[] (false? (:running @state)))
(defn init [args]
(swap! state assoc :running true)
(shutdown/add-hook! ::descriptive-name #(do
(swap! state assoc :running false)
(println "It's about to go down!")))
)
(defn start []
(while (:running @state)
(println "tick")
(dqt/async-deque shutdown?)
(Thread/sleep 2000)))
;; Enable command-line invocation
(defn -main [& args]
(init args)
(start))
更新
我理解这不是阻塞put的问题。这是因为我的代码正在创建无限循环。
此代码在循环中被调用,循环的父方法async-dequ在无限循环中被调用。
(go (while true (process-edn (<! event-chan)))))
我将此代码从loop-recur移动到主方法中,以便在守护进程启动时初始化