2024 年 Clojure 状态调查! 分享您的想法。

欢迎!有关如何使用本网站的信息,请参阅 关于 页面。

0
core.async

我们在 core.async 中遇到了一个棘手的问题。

在某些情况下,即使实际只有 1024 个挂起接收操作,但在异步分发线程中也会引发 "不允许超过 1024 个挂起接收操作"。
可以通过以下代码快速触发该情况:

`
(dotimes [_ 5000]
;; - 启动 1024 个线程,所有线程都通过 alt! 在一个公共通道 (ch) 和单个通道 (x) 上进行操作
;; - 公共通道始终为 '空'
;; - 通过 x 通道唤醒所有线程两次,使它们循环。
;; - 这会在异步分发线程中快速引发多个 '不允许在单个通道上超过 1024 个挂起接收操作' 断言。
(let [ch (async/chan)

    threads clojure.core.async.impl.protocols/MAX-QUEUE-SIZE
    wakeups 2
    x-chans
    (mapv (fn [n]
            (let [x (async/chan)]
              (async/go-loop []
                (async/alt!
                  ch nil
                  x ([_] (recur))))
              x))
          (range threads))]
(doseq [x x-chans]
  (dotimes [n wakeups]
    (async/>!! x 42)))
(async/close! ch)))

`

环境
Mac OS, JDK 8, Clojure 1.10.0, core-async 0.6.532

1 答案

0

尊敬的 Sperber 先生,

我认为我对这个问题理解得并不深刻。(...也...坦白说,我并没有真正花太多时间去思考这个问题......)

话虽如此,我刚才运行了以下操作

(ns asyncbug.core
  (:gen-class)
  (:require [clojure.core.async :as async]))


(defn -main
  "I don't do a whole lot ... yet."
  [& args]

  (dotimes [n 5000]
    (comment "
- start 1024 threads that all alt! on a common channel (ch) and
  on an individual channel (x)
- the common channel remains 'empty' all the time
- wakeup all theads twice via the x channel, which makes them loop.
- this quickly raises multiple
  'No more than 1024 pending takes are allowed on a single channel.'
  assertions in an async-dispatch thread.
")
    (let [ch (async/chan)

          threads clojure.core.async.impl.protocols/MAX-QUEUE-SIZE
          wakeups 2
          x-chans
          (mapv (fn [n]
                  (let [x (async/chan)]
                    (async/go-loop []
                      (async/alt!
                        ch nil
                        x ([_] (recur))))
                    x))
                (range threads))]
      (doseq [x x-chans]
        (dotimes [n wakeups]
          (async/>!! x 42)))
      (async/close! ch))

    (println "iteration: " n))

  (println "Hello, World!"))

(defproject asyncbug "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.0"]
                 [org.clojure/core.async "0.7.559"]]
  :main ^:skip-aot asyncbug.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}})

java -version
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)

最后得到了 "Hello, World!"。

再次......我对这里正在(应该)发生的事情并不是很确定......因此,我可能整个操作都是错误的......

我还只用 core-async 0.6.532 版本快速试了一下,但在成功地运行了 32 次迭代之后中止了......在这之前还没有引发任何 AssertionError。

很抱歉无法提供更多的帮助。

最好的祝愿,
Florian Tauber

...