2024 年 Clojure 状态调查! 中分享您的想法。

欢迎!有关此功能的更多信息,请参考关于 页面。

+1 投票
Clojure

core.async 有一个线程池,由 Java 系统属性 "clojure.core.async.pool-size" 进行列限制。但以下小 REPL 交互(简要地说)创建了成千上万的 Java 线程。

(require '[clojure.core.async :as a])
(def p (a/promise-chan))
(def a (atom 0))
(def b (atom 0))
(def cc (into [] (repeatedly 10000 (fn []
                                     (a/go
                                       (swap! a inc)
                                       (a/<! p)
                                       (swap! b inc))))))

(def cc...) 立即注意到,在其他琐事中,我发现 jstack(在另一个终端)显示 Java 线程数量短时间内增加。

问题:何时使用固定大小的线程池,与为每个 "go" 创建一个 Java 线程相比?

1 答案

0 投票

除非您将其显式设置为某个巨大的数字,否则这种情况不应该发生。
默认为 8:https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj#L20

您是如何“计数”线程的?

by
编辑 by
从运行上述代码的Visual VM中获取:活动线程8081和守护线程8080


并导出到REPL

> 线程 "async-dispatch-133" 中抛出异常、线程 "async-dispatch-132" 中抛出异常、线程 "async-dispatch-131" 中抛出异常、线程 "async-dispatch-130" 中抛出异常、线程 "async-dispatch-128" 中抛出异常、线程 "async-dispatch-127" 中抛出异常、线程 "async-dispatch-126" 中抛出异常  java.lang.AssertionError: 断言失败:单个通道中不允许超过1024个待处理获取。
((< (.size takes) impl/MAX-QUEUE-SIZE))
   在 clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
   在 clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
   在 clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
   在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415$fn__9417.invoke(NO_SOURCE_FILE:1)
   在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415.invoke(NO_SOURCE_FILE:1)
   在 clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
   在 clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
   在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
   在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
   在 investigate$fn__9405$fn__9414.invoke(NO_SOURCE_FILE:1)
   在 clojure.lang.AFn.run(AFn.java:22)
   在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   在 clojure.core.async.impl.concurrent(counted_thread_factory$reify__479$fn__480.invoke(concurrent.clj:29)
   在 clojure.lang.AFn.run(AFn.java:22)
   在 java.base/java.lang.Thread.run(Thread.java:830)


... 还有几百行类似上述的代码...

线程 "async-dispatch-99" 中抛出异常 [37.890s][警告][操作系统、线程] 无法启动线程 - pthread_create 失败 (EAGAIN),属性:栈大小:1024k,守卫大小:4k,分离。
线程 "async-dispatch-8177" 中抛出异常 java.lang.OutOfMemoryError: 无法创建本地线程:可能是因为内存不足或进程/资源限制已达到

 实际上,你创建了8000个线程,但任何时刻只有8个是活动的。你堆积了大量未经满足的待处理获取,达到了core.async的1024个限制。  https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/protocols.clj#L13

因此,你所做的是创建成千上万的线程然后立即杀死它们,基于以下FixedThreadPool有更多线程加入。

```
(defn thread-pool-executor
  ([]
    (thread-pool-executor nil))
  ([init-fn]
   (let [executor-svc (Executors/newFixedThreadPool
                        @pool-size
                        (conc/counted-thread-factory "async-dispatch-%d" true
                          {:init-fn init-fn}))]
     (reify impl/Executor
       (impl/exec [this r]
         (.execute executor-svc ^Runnable r))))))
```
@dpsutton 我原本期望我的代码片段能够将10000件东西放入队列,由大小为8的固定大小线程池提供服务,而不是创建10000个线程。您的评论让我查看了Executors的javadoc,其中说:“如果有任何线程在执行期间因失败而终止,在关闭之前,如果有必要执行后续任务,将占用其位置的新线程。”这可以解释大约有9000个线程对象从堆中回收,但这并不解释为什么在被Executor用新线程替换后,在VM中滞留了这么多实际线程+堆栈。也许操作系统线程会留在那里,直到垃圾收集最终确定Thread对象?
...