请在 2024 年 Clojure 状态调查! 中分享您的想法。

欢迎!请查看 关于 页面了解有关该功能的更多信息。

+1
Clojure

core.async 有一个线程池,由 Java 系统属性 "clojure.core.async.pool-size" 限制。但以下简单的 REPL 交互(简要地)创建了成千上万的 Java 线程。

(require '[clojure.core.async :as a])
(def p (a/promise-chan))
(def a (atom 0))
(def b (atom 0))
(def cc (into [] (repeatedly 10000 (fn []
                                     (a/go
                                       (swap! a inc)
                                       (a/<! p)
                                       (swap! b inc))))))

在我注意到 (def cc...) 的一刹那,我注意到,在另一个终端中运行的 jstack 展示了成千上万 Java 线程的短暂增加。

问题:何时使用固定大小的线程池,而非为每个 "go" 创建一个 Java 线程?

1 答案

0

除非你明确设置为某个很大的数字,否则不应该是这种情况。
默认为8:https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj#L20

你是如何“计数”线程的?

by
编辑 by
来自运行上述代码的 visual vm:活动线程 8081 和守护线程 8080


并输出到 repl

> 异常发生在线程 "async-dispatch-133" 异常发生在线程 "async-dispatch-132" 异常发生在线程 "async-dispatch-131" 异常发生在线程 "async-dispatch-130" 异常发生在线程 "async-dispatch-128" 异常发生在线程 "async-dispatch-127" 异常发生在线程 "async-dispatch-126" java.lang.AssertionError: AssertionError: 每个通道允许的最大挂起请求不超过 1024。
(< (.size takes) impl/MAX-QUEUE-SIZE)
    在 clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
    在 clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
    在 clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
    在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415$fn__9417.invoke(NO_SOURCE_FILE:1)
    在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415.invoke(NO_SOURCE_FILE:1)
    在 clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
    在 clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
    在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
    在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
    在 investigate$fn__9405$fn__9414.invoke(NO_SOURCE_FILE:1)
    在 clojure.lang.AFn.run(AFn.java:22)
    在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    在 clojure.core.async.impl.concurrent$counted_thread_factory$reify__479$fn__480.invoke(concurrent.clj:29)
    在 clojure.lang.AFn.run(AFn.java:22)
    在 java.base/java.lang.Thread.run(Thread.java:830)


... 看起来像上面的几百行代码...

异常发生在线程 "async-dispatch-99" [37.890s][warning][os,thread] 无法启动线程 - pthread_create 失败 (EAGAIN) 对于属性:stacksize: 1024k, guardsize: 4k, detached.
异常发生在线程 "async-dispatch-8177" java.lang.OutOfMemoryError: 无法创建本地线程:可能是内存不足或达到了进程/资源限制。

 您确实创建了 8000 个线程,但同时在任何时候只有 8 个是活动的。您积累了大量的未被满足的挂起请求数,达到了 core.async 的 1024 个限制。  https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/protocols.clj#L13

因此,您正在执行的操作是创建成千上万的线程并立即销毁它们,而更多的线程则基于以下 FixedThreadPool 跃入漏洞。

```
(defn thread-pool-executor
  ([]
    (thread-pool-executor nil))
  ([init-fn]
   (let [executor-svc (Executors/newFixedThreadPool
                        @pool-size
                        (conc/counted-thread-factory "async-dispatch-%d" true
                          {:init-fn init-fn}))]
     (reify impl/Executor
       (实现/执行 [this r]
         (.execute executor-svc ^Runnable r))))))
```
@dpsutton 我期望我的代码片段将 10000 件东西放入队列,由固定大小为 8 的线程池服务,而不是创建 10000 个线程。 你的评论让我看了 Executors 的 javadoc,它说:“如果任何线程在关闭之前由于在执行过程中失败而终止,如果需要执行后续任务,将会有一位新的线程来取代它。” 这可以解释为什么在堆中从垃圾回收器中回收了大约 9000 个 Thread 对象,但这并没有解释为什么在 Executor 用新线程替换后,VM 中会有这么多实际的线程+堆栈悬而未决。 也许系统线程会一直存在,直到垃圾回收最终确定 Thread 对象?
...