2024 Clojure 状态调查!分享你的想法。

欢迎!请参阅关于页面以了解如何工作的更多信息。

0
core.async
编辑

我遇到了以下错误

单通道中不允许超过1024个挂起的获取操作

然而,我以为如果我定义了一个具有指定大小的通道,则放置操作会阻塞。

(def event-chan (chan 10))

我正在循环中调用此操作

(>!! event-chan evt)

我的预期是如果通道中有10个挂起项,则会阻塞。然而,似乎并没有发生。我的代码正在读取数据库中的fifo队列并进行处理。即使通道中有挂起项,似乎也不会阻塞。

编辑:已更新为复制粘贴实际的完整代码。

(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
  
(def event-chan (chan 10))
  
(defn eat-now-or-later[evt]
    (if ( = (evt :code) :loadevt)
        (do (println "sync processing load event ")
            (process-edn evt))
        (>!! event-chan evt)
      ))
  
  (defn async-deque [shutdown?]
    (time! dq-call-processing-time
           (let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
                 fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
             (go (while true (process-edn (<! event-chan))))

             (loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
              
              (if (or (shutdown?)  (nil? evt)) 0
                  
                  (do                   
                   (eat-now-or-later evt)  
                   (recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
                  )
              )))
    (ptasks/print-stats))

上述是函数定义。我从下面的守护进程代码中调用此函数

(ns dqdaemon.core
  (:require [shutdown.core :as shutdown])
  (:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
 )

;; A crude approximation of your application's state.
(def state (atom {}))

(defn shutdown?[] (false? (:running @state)))

(defn init [args]
  (swap! state assoc :running true)
  (shutdown/add-hook! ::descriptive-name #(do
                                            (swap! state assoc :running false)         
                                            (println "It's about to go down!")))
  )

(defn start []
  (while (:running @state)
    (println "tick")
     (dqt/async-deque shutdown?)
    (Thread/sleep 2000)))


;; Enable command-line invocation
(defn -main [& args]
  (init args)
  (start))

更新
我明白这不是阻塞放置的问题。是因为我的代码创建了无限的多重循环。

此代码在循环中被调用,并且是其父方法async-dequ在永久循环中被调用。

(go (while true (process-edn (<! event-chan)))))

我将此代码从loop-recur中移出,放入主方法中以在守护进程启动时初始化

1 答案

0
作者:
被选中 由:
 
最佳回答

我不太明白您如何从那段代码中得到这个错误。您能否提供实际的代码或一个重现问题的独立实例?

作者:
嗨 @alexmiller -- 感谢您的反馈。我已经粘贴了原始代码
作者:
错误信息指的是取(takes),而不是放(puts),所以它与 >!! 无关。信息表明有1000多个go块或线程正在尝试从同一个通道取数据。代码仍然不太完整,但问题可能是由于 `start` 在循环中调用 `async-deque` ,而 `async-deque` 启动了一个无限go循环,所以最终会有大量的这些go循环在运行。
...