2024 Clojure 状态调查中分享您的想法!

欢迎!请参阅关于页面以获取更多关于此功能的详细信息。

0投票
core.async
编辑

我得到以下错误

单个通道上不允许超过1024个挂起的摄取

然而,我本以为如果我定义一个具有指定大小的通道,则put操作将会阻塞。

(def event-chan (chan 10))

我正在循环调用此操作

(>!! event-chan evt)

我的预期是如果有10个项目在通道中挂起,操作将会阻塞。但这似乎并没有发生。我的代码正在从数据库中读取fifo队列并进行处理。即使在通道中项目挂起时,它也不会阻塞。

编辑:已更新为复制粘贴完整实际的代码。

(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
  
(def event-chan (chan 10))
  
(defn eat-now-or-later[evt]
    (if ( = (evt :code) :loadevt)
        (do (println "sync processing load event ")
            (process-edn evt))
        (>!! event-chan evt)
      ))
  
  (defn async-deque [shutdown?]
    (time! dq-call-processing-time
           (let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
                 fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
             (go (while true (process-edn (<! event-chan))))

             (loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
              
              (if (or (shutdown?)  (nil? evt)) 0
                  
                  (do                   
                   (eat-now-or-later evt)  
                   (recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
                  )
              )))
    (ptasks/print-stats))

上述是函数定义。我是从下面的守护程序代码中调用此函数的。

(ns dqdaemon.core
  (:require [shutdown.core :as shutdown])
  (:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
 )

;; A crude approximation of your application's state.
(def state (atom {}))

(defn shutdown?[] (false? (:running @state)))

(defn init [args]
  (swap! state assoc :running true)
  (shutdown/add-hook! ::descriptive-name #(do
                                            (swap! state assoc :running false)         
                                            (println "It's about to go down!")))
  )

(defn start []
  (while (:running @state)
    (println "tick")
     (dqt/async-deque shutdown?)
    (Thread/sleep 2000)))


;; Enable command-line invocation
(defn -main [& args]
  (init args)
  (start))

更新
我明白这并不是阻塞式put操作的问题。这因为我的代码正在创建无限循环。

此代码正在循环调用中,它的父方法async-dequ正在永久循环中调用。

(go (while true (process-edn (<! event-chan)))))

我将此代码从loop-recur移动到主方法中,以在守护程序启动时初始化。

1 答案

0投票

已选出
 
最佳答案

我不明白您是如何从那段代码中得到这个错误。能否发布实际的代码或者一个独立的示例来重现问题?

嗨 @alexmiller -- 感谢您的反馈。我已经粘贴了原始代码
错误信息指的是 takes - 不是 puts - 所以这与 >!! 没有关系。信息表明有1000多个 go 块或线程都在尝试从同一个通道中获取。代码仍然有些不完整,但问题可能是因为 `start` 在循环中调用 `async-deque`,而 `async-deque` 开始了一个无尽的 go 循环,所以最终会有大量这些 go 循环比邻操作。
...