2024 Clojure状态调查!分享你的想法。

欢迎!请查阅关于页面了解更多该网站的工作方式信息。

+14 投票
Java互操作 是由
已关闭

在这里提问是因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发表。

我在运行一个初创企业时遇到了在Java 19虚拟线程中使用synchronized块的问题。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

这需要大量的工程努力和精神去调试,因为-Djdk.tracePinnedThreads报告了确切的零。

目前,我们需要重新实现一个虚拟线程友好的(非synchronizedclojure.core/delay,以及尽可能地多的clojure.core函数,确保虚拟线程不会被固定。

一如既往,对Clojure团队的工作感到惊叹!只是想强调一些API的虚拟线程不友好性,如果可能的话,将其列入优先事项,因为虚拟线程非常有用。

谢谢!

以以下备注关闭: 在1.12.0-alpha5中完成

2 答案

+2 投票
 
最佳答案

Clojure 1.12.0-alpha5 现已发布,现在使用互斥锁而不是同步来修改 lazy-seq 和 delay。

0

谢谢,绝对有兴趣为虚拟线程准备 Clojure。我想知道在你的 delay 中有什么?

很高兴听到这一点!最初是对 HTTP 响应后处理(它本身是 HttpKit 返回的承诺)。然后,为了消除一些变量,我删除了 HTTP 请求,添加了一个未实现的“虚拟”承诺,并将 `delay` 主体简单地更改为 `(deref p <timeout> nil)`。`` 的低值 —— 1-10 毫秒 —— 几乎从未显示出任何问题。较高的值会越来越频繁地导致因“虚拟线程固定”而导致的“虚拟线程饥饿”。

使用带 `ReentrantLock` 的自定义 `Delay` 实现覆盖 `clojure.lang.Delay` 修复了问题。很高兴分享我们的 Java 类 —— 它在平台和虚拟线程上都可以通过 https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。(它还有一个优化,使它能够避免在 `deref` 中如果可以引用非同步的一个的话,则引用 `volatile` 值字段。)
关于虚拟线程中的 synchronized 的主要问题在于围绕阻塞的同步,因此围绕该部分的延迟绝对可能是一个问题。我已经查看内核中的所有同步,实际上只有少数几个看起来有问题(这是其中之一)。

“如果可以引用非同步的一个,则避免在 `deref` 中引用 `volatile` 值字段”听起来像是 Java 中典型的并发错误(类似于双重检查锁定),但根据使用情况可能会有所不同。只需记住,没有任何保证任何其他线程将看到对共享的非volatile、非同步状态的更改。
by
关于`synchronized`同意。

关于可能并发错误的观察很好。以下是相关代码。特别地,`deref`使用了`volatile`/非同步策略。

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      volatile读取
      Object v = value;

      if (v instanceof PendingValue) {
        try {
           lock.lock();

           volatile读取在锁定后,以供变更使用
           Object vAfter = value;

           如果(vAfter instanceof PendingValue) {
            IFn f = (IFn)((PendingValue)vAfter).v;

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          } else {
            return getOrThrow(vAfter);
         }
        >finally {
           lock.unlock();
        
        }
        else {
        return v;
    } else {
      }
    }
  }

  ... // 在这里实现`force`,`isRealized`
}
```
在你的代码中,你在加锁的情况下写入unsynchronizedValue,但读取操作没有加锁,这就无法保证其他线程能够实际看到那次写入。这是一个线程可见性问题,类似于经典的双重检查锁定模式。我不会尝试用这种方式欺骗,但在这里使用ReentrantReadWriteLock可能是有益的。
你说得对——我们没有期望读取器必须获取对`unsynchronizedValue`的更改。然而,如果一个线程未能获取更改,它将回退到读取`value`(这是`volatile`),这会保证始终是最新的。在我们的测试中,大约40%的读取是在读取`value`之前获取到`unsynchronizedValue`的,因此从这个角度来看(以及基于其他内部性能测试),这种优化似乎是宝贵的。在所有情况下,`deref()`都返回了正确的结果。

探索`ReentrantReadWriteLock`的提议非常有趣——谢谢您!
我希望能了解最初有问题的代码做了什么。是否是“http响应的后处理”本身运行了更多的I/O操作,比如像广播请求一样?
使用虚拟线程的场景,正如你所疑虑的,是I/O。(我说“过去是”,因为像我们这样使用的虚拟线程——太不可靠了。它们会导致难以调试的死锁,并可能耗费我们整整一个月的开发时间。我们现在已经转换到在大型线程池之上使用自定义的`future`宏。)我们使用`http-kit`进行HTTP请求的广播式处理,然后对响应进行处理。我们需要完成所有这些工作,1)不会阻塞调用线程;2)不会耗尽`clojure.core/future`线程池。我们考虑到虽然我们可以(并确实)使用一个变通方法,但虚拟线程可以是一个不错的解决方案,所以我们转向了它。

以下是我们的代码实现
```clojure
;; 库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;; 使用`delay`使它保持为可引用的输出,同时避免使用线程
    (delay (post-process @response))))

;; 业务逻辑函数中
;; 将在上游被`deref`
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
        ;; 并发启动所有请求
        (mapv #(vthread @(request-and-post-process %)))
        ;; 聚合
        (mapv deref))))
```

请注意,我们使用了`clojure.core/delay`。事实证明,`clojure.lang.Delay`在其`deref`方法中使用了`synchronized`(见Clojure问答论坛),这会阻塞虚拟线程。`deref`延迟会使调用虚拟线程等待HTTP请求和处理完成的时间,在某些情况下可能需要30秒。我们在这里不仅看到了阻塞行为,还发现了意料之外的死锁。%(这里的阻塞是`clojure.lang.Delay`的问题;死锁是关于虚拟线程的底层数据虚拟机实现的问题,据我所知。我认为在bugs.openjdk.org上有关于这个问题的开放票据。)为了避免阻塞,我们使用了基于我们的自定义`ReentrantLock`的`delay`。然而,还有其他形式的阻塞,例如在JSON反序列化库中,这使我们即使增强了`delay`仍然陷入了死锁区。

如果这些都回答了你的问题!我很乐意进一步解释。
by
关于似乎在JVM中是错误的死锁问题,在JDK 21中这个问题已经被解决了吗?还是这只属于JDK 19虚拟线程预览版的bug?
...