分享您的想法,参加2024 Clojure 状况调查!

欢迎!请参阅关于页面,以了解更多关于此工作方式的信息。

0
core.async
编辑

我遇到了以下错误

单个通道上不允许超过1024个挂起的take操作

然而,我以为如果我为通道定义了一个特定大小的规格,put将会阻塞。

(def event-chan (chan 10))

我在循环中调用这个函数

(>!! event-chan evt)

我的预期是当通道中有10个未处理的项目时将阻塞,然而这并没有发生。我的代码正在读取数据库中的fifo队列并进行处理。它似乎即使通道中有未处理的项目也不会阻塞。

编辑:已更新为复制粘贴完整的实际代码。

(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
  
(def event-chan (chan 10))
  
(defn eat-now-or-later[evt]
    (if ( = (evt :code) :loadevt)
        (do (println "sync processing load event ")
            (process-edn evt))
        (>!! event-chan evt)
      ))
  
  (defn async-deque [shutdown?]
    (time! dq-call-processing-time
           (let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
                 fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
             (go (while true (process-edn (<! event-chan))))

             (loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
              
              (if (or (shutdown?)  (nil? evt)) 0
                  
                  (do                   
                   (eat-now-or-later evt)  
                   (recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
                  )
              )))
    (ptasks/print-stats))

上面的内容是函数定义。我正在从下面的守护进程代码中调用此函数

(ns dqdaemon.core
  (:require [shutdown.core :as shutdown])
  (:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
 )

;; A crude approximation of your application's state.
(def state (atom {}))

(defn shutdown?[] (false? (:running @state)))

(defn init [args]
  (swap! state assoc :running true)
  (shutdown/add-hook! ::descriptive-name #(do
                                            (swap! state assoc :running false)         
                                            (println "It's about to go down!")))
  )

(defn start []
  (while (:running @state)
    (println "tick")
     (dqt/async-deque shutdown?)
    (Thread/sleep 2000)))


;; Enable command-line invocation
(defn -main [& args]
  (init args)
  (start))

更新
我明白这不是阻塞put的问题。问题是因为我的代码正在创建无限循环。

此代码在循环中被调用,而该循环的父方法async-deque则在一个永久循环中被调用。

(go (while true (process-edn (<! event-chan)))))

我将此代码移动到主方法中,以便在守护进程启动时初始化

1 个回答

0
by
已选定 by
 
最佳答案

我不太清楚您是如何从这段代码中产生那个错误的。您能贴上实际的代码或者一个独立重现问题的示例吗?

by
嗨 @alexmiller -- 感谢您的反馈。我已经粘贴了原始代码
by
错误信息提到的是 takes - 不是 puts - 所以它与 >!! 无关。信息表明有1000+个 go blocks 或 threads 都试图从同一个 channel 中取出。代码仍然没有完全加起来,但问题可能是由 `start` 在循环中调用 `async-deque` 引起的,`async-deque` 启动了一个无限循环的 go loop,所以最终会有大量的那些 go loops 运行。
...