2024 Clojure 状态调查!中分享您的想法。

欢迎!请访问关于页面以了解有关如何操作的一些更多信息。

+14
Java 互操作
关闭

在此提问,因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发布。

我运营一家初创公司,我们在使用Java 19虚拟线程时遇到了这个synchronized块。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

由于-Djdk.tracePinnedThreads报告了确切的无信息,调试这个耗时的工作需要大量的工程努力和理智。

目前,我们需要重新实现一个虚拟线程友好的(非synchronizedclojure.core/delay,以及我们需要的尽可能多的clojure.core函数,以确保虚拟线程不会被绑定。

一如既往,我惊叹于Clojure团队的工作!只是想突出其API中一些对虚拟线程不友好之处,并尽可能优先处理,因为虚拟线程极其有用。

谢谢!

以以下注解关闭: 在 1.12.0-alpha5 中完成

2 个答案

+2
 
最佳答案

Clojure 1.12.0-alpha5 已可用,并修改了lazy-seq和delay以使用锁而不是synchronized。

0

谢谢,一定对为虚拟线程准备Clojure感兴趣。我很想知道你的延迟内容是什么?

很高兴听到这个好消息!最初是处理HTTP响应(它本身是由 HttpKit 返回的一个承诺)。为了消除一些变量,我删除了HTTP请求,添加了一个未实现的“虚”承诺,并将`delay`的主体简单地设置为`(deref p <timeout> nil)`。`<timeout>`的低值一一1-10毫秒——几乎从未显示任何问题。高值会越来越频繁地导致“由于挂起而导致的虚拟线程饥饿”。

通过使用`ReentrantLock`重写`clojure.lang.Delay`,使用自定义`Delay`实现解决了这个问题。很高兴与大家分享我们拥有的Java类——它能够在平台和虚拟线程上通过测试 https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj (它还有一个优化,使得它如果要引用未同步的一个,则避免在`deref`上引用一个`volatile`值字段。)
虚拟线程中`synchronized`的问题主要是围绕阻塞进行同步,所以在那个地方延迟看起来可能是一个潜在的问题。我已经检查了核心中的所有同步,但实际上只有一些看起来有问题(这个就是这样)。

“如果可能,避免在`deref`上引用一个`volatile`值字段,而要引用未同步的一个”听起来像是一个经典的Java并发错误(比如双重检查锁定),但可能会根据使用情况而有所改善。只是记住,没有保证任何其他线程都能看到共享的非`volatile`、非`synchronized`状态的变化。
by
关于`synchronized`表示同意。

对于可能的并发错误的观察很到位。以下是相关代码。特别是`deref`使用了`volatile`/非同步策略

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      进行volatile读取
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          在持有锁之后,volatile读取,以防它发生了变化
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          } else {
            return getOrThrow(vAfter);
          }
        } finally {
          lock.unlock();
        }
      } else {
        return v;
      }
    } else {
      return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`, `isRealized` implemented here
}
```
by
您在锁下写入unsynchronizedValue,但读取操作不在锁下执行,因此无法保证其他线程实际上能够看到该写入。这是一个类似于经典双重检查锁定模式的问题,是线程可见性问题。我不建议这种方式作弊,但在这里可能使用ReentrantReadWriteLock会比较有用。
   by
您是对的——不应该期望读者“必须”获取对`unsynchronizedValue`的更改。但是,如果线程未能获取更改,则会回退到读取`value`(它被标记为`volatile`),这保证了它总是最新的。在我们的测试中,大约有40%的读取在读取`value`之前都会读取到`unsynchronizedValue`,因此在这方面(以及基于其他内部性能测试)优化似乎是有价值的。在所有情况下`deref()`都产生了正确的结果。

探索`ReentrantReadWriteLock`是一个有趣的建议——谢谢!
by
我想了解原始有问题的代码是如何执行的。所谓的“http响应后处理”本身是否在进行进一步的IO操作?比如扇出请求?
   by
我们使用Virtual Threads的场景,正如您所猜测的,是IO。我说“是”,因为(我们使用的)虚拟线程太不可靠了。它们会造成昂贵的死锁,调试起来十分困难,我们可能损失了一个月的开发时间。我们后来转换到了在大型线程池上使用自定义的`future`宏。)我们使用`http-kit`进行HTTP请求扇出,然后对这些响应进行后处理。我们需要这一切都做到:1)不阻塞调用线程;2)不耗尽`clojure.core/future`线程池。我们认为,虽然我们可以(并确实)使用一个修复程序,但是虚拟线程可能是一个很好的解决方案,因此我们转向了它。

基本上这就我们执行的操作的代码
```clojure
;; 在一个库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    .;我们使用`delay`来将其保持为可展开的输出,同时避免使用线程
    (delay (post-process @response))))

;;在一个业务逻辑函数中
将在上游进行`deref`操作
(定义异步并发生成请求数据,参数为[requests])
  (使用vthread
    (->> requests
        ;; 同时启动所有请求
         (mapv #(vthread @(request-and-post-process %)))
         ;; 聚合
         (mapv deref))))
```

注意使用`clojure.core/delay`。事实证明,`clojure.lang.Delay`在它的`deref`方法中使用`synchronized`,这会固定虚拟线程。`deref` `delay`会让调用虚拟线程等待HTTP请求和后处理的运行时间,在某些情况下可能需要30秒。我们不仅看到了固定的行为,还看到了不可预期的死锁。固定的问题与`clojure.lang.Delay`有关;死锁的问题与我们所能理解的底层JVM虚拟线程的实施有关。我相信在bugs.openjdk.org上有针对此的开放票据。为了避免固定,我们使用了基于`ReentrantLock`的自定义`delay`。然而,例如JSON反序列化库中恰好有其他固定发生,尽管我们增强了`delay`,但它仍然使我们陷入死锁。

如果你觉得这些回答了你的问题,请告诉我!我很乐意进一步解释。
关于看起来是JVM错误的死锁问题,这已经在JDK 21中得到解决吗?还是这只是JDK 19虚拟线程预览中的错误?
...