2024 Clojure 状态调查!中分享您的想法。

欢迎!请访问关于页面获取更多关于如何使用本站的信息。

+1 投票
Clojure

core.async 有一个由 Java 系统属性“clojure.core.async.pool-size”限制的线程池。但是,以下简短的交互式协议(REPL)中的小交互(简短地)创建了成千上万的 Java 线程。

(require '[clojure.core.async :as a])
(def p (a/promise-chan))
(def a (atom 0))
(def b (atom 0))
(def cc (into [] (repeatedly 10000 (fn []
                                     (a/go
                                       (swap! a inc)
                                       (a/<! p)
                                       (swap! b inc))))))

(def cc...) 的时候,我发现(在另一个终端),jstack 显示了成千上万 Java 线程的短暂增加。

问题:何时使用固定大小的线程池,何时为每个 "go" 创建 Java 线程?

1 答案

0 投票
by

除非你将其显式地设置为一个很大的数字,否则不应该如此。
默认值为 8:https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj#L20

你如何“计数”线程的?


编辑
从运行上述代码的Visual VM获得:活动线程8081和守护线程8080


并输出到repl

> 线程 "async-dispatch-133" 中发生异常,线程 "async-dispatch-132" 中发生异常,线程 "async-dispatch-131" 中发生异常,线程 "async-dispatch-130" 中发生异常,线程 "async-dispatch-128" 中发生异常,线程 "async-dispatch-127" 中发生异常,线程 "async-dispatch-126" 中发生异常 java.lang.AssertionError: 断言失败:一个通道上允许的挂起接受操作不超过1024个。
(< (.size takes) impl/MAX-QUEUE-SIZE)
   在 clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
   在 clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
   在 clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
   在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415$fn__9417.invoke(NO_SOURCE_FILE:1)
   在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415.invoke(NO_SOURCE_FILE:1)
   在 clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
   在 clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
   在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
   在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
   在 investigate$fn__9405$fn__9414.invoke(NO_SOURCE_FILE:1)
   在 clojure.lang.AFn.run(AFn.java:22)
   在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   在 clojure.core.async.impl.concurrent$counted_thread_factory$reify__479$fn__480.invoke(concurrent.clj:29)
   在 clojure.lang.AFn.run(AFn.java:22)
   在 java.base/java.lang.Thread.run(Thread.java:830)


... 看起来像上面的几百行...

线程 "async-dispatch-99" 中发生异常 [37.890s][warning][os,thread] 无法启动线程 - pthread_create失败(EAGAIN),对于属性:stacksize:1024k,guardsize:4k,detached。
线程 "async-dispatch-8177" 发生 java.lang.OutOfMemoryError: 无法创建本地线程:可能内存不足或已达到进程/资源限制

 确实你在创建8000个线程,但你任何时候只保留8个活动线程。你堆积了大量的未满足的挂起接受操作,触发了core.async的1024个限制。  https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/protocols.clj#L13

所以你的做法是创建成千上万的线程并立即杀死它们,还有更多的线程基于以下FixedThreadPool跳进来

```
(defn thread-pool-executor
  ([]
    (thread-pool-executor nil))
  ([init-fn]
    (let [executor-svc (Executors/newFixedThreadPool
                        @pool-size
                        (conc/counted-thread-factory "async-dispatch-%d" true
                          {:init-fn init-fn}))]
     (reify impl/Executor
       (impl/exec [this r]
         (.execute executor-svc ^Runnable r))))))
```
@dpsutton 我原本预期我的代码片段会将10000个对象放入队列,并由固定大小为8的线程池提供服务;而不是创建10000个线程。您的评论让我查阅了Executors的javadoc,其中提到:“如果有任何线程因执行过程中的失败而终止,在shutdown之前,如果需要执行后续任务,将会有新的线程取代它。”这可以解释为什么有大约9000个Thread对象需要从堆中垃圾回收,但它没有解释为什么在Executor用新线程替换后,在虚拟机(VM)中仍有这么多实际的线程+栈 lingering。或许操作系统线程会一直存在,直到垃圾回收最终确定Thread对象?
...