请在2024年Clojure状态调查!中分享您的想法。

欢迎!请访问关于页面以获取更多有关此操作的信息。

+1
Clojure

core.async有一个线程池,其大小由Java系统属性"clojure.core.async.pool-size"限制。但下面的简单REPL交互(简短地)创建了数千个Java线程。

(require '[clojure.core.async :as a])
(def p (a/promise-chan))
(def a (atom 0))
(def b (atom 0))
(def cc (into [] (repeatedly 10000 (fn []
                                     (a/go
                                       (swap! a inc)
                                       (a/<! p)
                                       (swap! b inc))))))

在我注意到(def cc...)之后,我注意到(在另一个终端中),jstack显示了数千个Java线程的短暂增加。

问题:何时使用固定大小的线程池,何时为每个"go"创建Java线程?

1 个回答

0
by

除非您明确地将其设置为某个大数字,否则情况应该不是这样。
默认值为8:[代码链接](https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj#L20)

你是如何“数”你的线程的?

by
编辑 by
来自运行上述代码的visual vm:活动线程8081和守护线程8080


并将异常信息输出到repl

java.lang.AssertionError: Assert failed: 在单个通道上不允许超过1024个挂起的请求。
(<.size requests) impl/MAX-QUEUE-SIZE)
    在 clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
    在 clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
    在 clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
    在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415$fn__9417.invoke(NO_SOURCE_FILE:1)
    在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415.invoke(NO_SOURCE_FILE:1)
    在 clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
    在 clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
    在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
    在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
    在 investigate$fn__9405$fn__9414.invoke(NO_SOURCE_FILE:1)
    在 clojure.lang.AFn.run(AFn.java:22)
    在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    在 clojure.core.async.impl.concurrent$counted_thread_factory$reify__479$fn__480.invoke(concurrent.clj:29)
    在 clojure.lang.AFn.run(AFn.java:22)
    在 java.base/java.lang.Thread.run(Thread.java:830)


... 看起来像以上的代码数百行...

线程 "async-dispatch-99" 中发生异常 [37.890s][警告][os,thread] 无法启动线程 - pthread_create失败(EAGAIN),属性为:stacksize: 1024k, guardsize: 4k, detached。
线程 "async-dispatch-8177" java.lang.OutOfMemoryError: 无法创建本地线程:可能是内存不足或已达到进程/资源限制。

确实,虽然你创建了8000个线程,但任何时候只有8个是活跃的。你在累积大量的挂起请求,这些请求无法满足,并触发了core.async的1024个限制。  https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/protocols.clj#L13

所以你做了的是创建成千上万的线程并立即杀死它们,基于下面的FixedThreadPool有更多线程进入MQ。

```
(defn thread-pool-executor
  ([]
    (thread-pool-executor nil))
  ([init-fn]
    (let [executor-svc (Executors/newFixedThreadPool
                         @pool-size
                          (conc/counted-thread-factory "async-dispatch-%d" true
                          {:init-fn init-fn}))]
     (reify impl/Executor
       (impl/exec [this r]
         (.execute executor-svc ^Runnable r))))))
```
@dpsutton 我原本期望我的代码片段将 10000 项放入队列中,由一个固定大小的 8 个线程池来服务;而不是创建 10000 个线程。  您的评论引导我查阅了 Executors 的 javadoc,其中说明:"如果在关机前由于执行期间的失败而终止了任何线程,如果需要执行后续任务,将用新的线程取代它"  这解释了大约 9000 个从堆中垃圾回收的 Thread 对象,但这并没有解释为什么在 Executor 用一个新线程替换后,仍有如此多的实际线程+栈在 VM 中滞留。  也许操作系统的线程会一直存在,直到垃圾收集最终确定 Thread 对象?
...