请在2024年Clojure调查问卷中分享您的想法!

欢迎!请查看关于页面以获取有关如何使用此操作的一些更多信息。

0
core.async
编辑于

我遇到了以下错误

单个通道上允许的等待取出操作不得超过1024个

然而,我认为如果定义了一个指定大小的通道,put操作将会阻塞。

(def event-chan (chan 10))

我正在循环调用此操作

(>!! event-chan evt)

我的预期是如果有10个项目正在通道中等待,它将会阻塞。然而,这好像没有发生。我的代码正在读取数据库中的FIFO队列并进行处理。即使项目正在通道中等待,它似乎也不会阻塞。

编辑:已更新为复制粘贴完整的实际代码。

(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
  
(def event-chan (chan 10))
  
(defn eat-now-or-later[evt]
    (if ( = (evt :code) :loadevt)
        (do (println "sync processing load event ")
            (process-edn evt))
        (>!! event-chan evt)
      ))
  
  (defn async-deque [shutdown?]
    (time! dq-call-processing-time
           (let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
                 fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
             (go (while true (process-edn (<! event-chan))))

             (loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
              
              (if (or (shutdown?)  (nil? evt)) 0
                  
                  (do                   
                   (eat-now-or-later evt)  
                   (recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
                  )
              )))
    (ptasks/print-stats))

上面的内容是函数定义。我从下面的守护进程代码中调用此函数

(ns dqdaemon.core
  (:require [shutdown.core :as shutdown])
  (:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
 )

;; A crude approximation of your application's state.
(def state (atom {}))

(defn shutdown?[] (false? (:running @state)))

(defn init [args]
  (swap! state assoc :running true)
  (shutdown/add-hook! ::descriptive-name #(do
                                            (swap! state assoc :running false)         
                                            (println "It's about to go down!")))
  )

(defn start []
  (while (:running @state)
    (println "tick")
     (dqt/async-deque shutdown?)
    (Thread/sleep 2000)))


;; Enable command-line invocation
(defn -main [& args]
  (init args)
  (start))

更新
我理解这不是阻塞put的问题。这是因为我的代码正在创建无限循环。

此代码在循环中被调用,循环的父方法async-dequ在无限循环中被调用。

(go (while true (process-edn (<! event-chan)))))

我将此代码从loop-recur移动到主方法中,以便在守护进程启动时初始化

1 个答案

0

已选中
 
最佳答案

我不知道你怎么从那段代码中得到该错误。你能发布实际的代码吗?或者发布一个可以重现问题的独立示例。

嗨 @alexmiller -- 感谢您的反馈。我已经粘贴了原始代码
错误消息指的是 takes - 不是 puts - 所以它与 >!! 没有关联。消息表明有1000+个 go blocks 或 threads 正在尝试从同一个 channel 中获取。代码仍然不太对劲,但问题可能是因为 `start` 在循环中调用 `async-deque`,而 `async-deque` 启动了一个无限循环,因此最终会有大量的 go 循环在运行。
...