来自运行上述代码的visual vm:活动线程8081和守护线程8080
并将异常信息输出到repl
java.lang.AssertionError: Assert failed: 在单个通道上不允许超过1024个挂起的请求。
(<.size requests) impl/MAX-QUEUE-SIZE)
在 clojure.core.async.impl.channels.ManyToManyChannel.take_BANG_(channels.clj:235)
在 clojure.core.async.impl.ioc_macros$take_BANG_.invokeStatic(ioc_macros.clj:988)
在 clojure.core.async.impl.ioc_macros$take_BANG_.invoke(ioc_macros.clj:987)
在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415$fn__9417.invoke(NO_SOURCE_FILE:1)
在 investigate$fn__9405$fn__9414$state_machine__6606__auto____9415.invoke(NO_SOURCE_FILE:1)
在 clojure.core.async.impl.ioc_macros$run_state_machine.invokeStatic(ioc_macros.clj:978)
在 clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:977)
在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invokeStatic(ioc_macros.clj:982)
在 clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:980)
在 investigate$fn__9405$fn__9414.invoke(NO_SOURCE_FILE:1)
在 clojure.lang.AFn.run(AFn.java:22)
在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
在 clojure.core.async.impl.concurrent$counted_thread_factory$reify__479$fn__480.invoke(concurrent.clj:29)
在 clojure.lang.AFn.run(AFn.java:22)
在 java.base/java.lang.Thread.run(Thread.java:830)
... 看起来像以上的代码数百行...
线程 "async-dispatch-99" 中发生异常 [37.890s][警告][os,thread] 无法启动线程 - pthread_create失败(EAGAIN),属性为:stacksize: 1024k, guardsize: 4k, detached。
线程 "async-dispatch-8177" java.lang.OutOfMemoryError: 无法创建本地线程:可能是内存不足或已达到进程/资源限制。
确实,虽然你创建了8000个线程,但任何时候只有8个是活跃的。你在累积大量的挂起请求,这些请求无法满足,并触发了core.async的1024个限制。
https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/protocols.clj#L13
所以你做了的是创建成千上万的线程并立即杀死它们,基于下面的FixedThreadPool有更多线程进入MQ。
```
(defn thread-pool-executor
([]
(thread-pool-executor nil))
([init-fn]
(let [executor-svc (Executors/newFixedThreadPool
@pool-size
(conc/counted-thread-factory "async-dispatch-%d" true
{:init-fn init-fn}))]
(reify impl/Executor
(impl/exec [this r]
(.execute executor-svc ^Runnable r))))))
```