2024 Clojure 状态调查!中分享您的想法。

欢迎!有关此功能更多信息的请查看关于页面。

+14
Java Interop
已关闭

我在这里提问是因为我无法在 https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771 上发布。

我经营一家初创公司,我们在使用 Java 19 虚拟线程时遇到了这个 synchronized 块。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

由于 -Djdk.tracePinnedThreads 没有报告任何内容,调试这个消耗了大量工程努力和时间。

目前,我们需要重新实现一个虚拟线程友好的(非 synchronizedclojure.core/delay,以及尽可能多的 clojure.core 函数,以确保虚拟线程不会被固定。

一如既往,我对 Clojure 团队的工作感到惊讶!只是想强调一些 API 的虚拟线程不友好性,并在可能的情况下将其优先化,因为虚拟线程非常有用。

谢谢!

已关闭,备注: 在 1.12.0-alpha5 中完成

2 个回答

+2
 
最佳答案

Clojure 1.12.0-alpha5 现已可用,并修改了 lazy-seq 和 delay 以使用锁而不再使用 synchronized。

0

谢谢,非常感兴趣为虚拟线程准备Clojure。我很想知道你延期任务中有什么内容?

很高兴听到这个消息!最初是处理HTTP响应(它本身是HttpKit返回的承诺)。然后,为了消除一些变量,我移除了HTTP请求,添加了一个尚未实现的“虚拟”承诺,并将`delay`的主体改为`(deref p <timeout> nil)`。低`<timeout>`值(1-10毫秒)几乎从未显示任何问题。更高的值越来越有可能由于锁定而导致“虚拟线程饥饿”。

通过使用`ReentrantLock`自定义`Delay`实现覆盖`clojure.lang.Delay`修复了这个问题。很高兴分享我们的Java类 —— 它在平台和虚拟线程上都可以通过测试https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。(它还有一些优化,可以避免在`deref`中对`volatile`值字段进行引用,如果可以引用未同步的一个的话。)
虚拟线程中使用`synchronized`的主要问题是围绕阻塞进行同步,所以肯定在延迟部分引发了一个可能的问题。我已经检查了核心中的所有同步,实际上只有几个看起来有问题(这是其中之一)。

“如果可以引用未同步的一个,则避免在`deref`中引用`volatile`值字段”听起来像是Java中经典的并发错误(如同双检查锁定),但这可能取决于使用情况。只是记得,没有任何保证其他线程将看到对共享的非volatile、非synchronized状态的更改。
同意`synchronized`的问题。

关于可能的并发错误的良好观察。以下是相关代码。特别是`deref`使用了`volatile`/非同步策略。

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      // Volatile读取
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          // 在获取锁后进行volatile读取,以防它已更改
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
            } else {
            return getOrThrow(vAfter);
         }
        try {
          lock.unlock();
        }
        }
        return v;
      }
    } else {
      return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`, `isRealized`实现在此处
}
```
by
您在锁下写入unsynchronizedValue,但读取操作不在锁下,因此无法保证其他线程实际上会看到该写入。这是一个线程可见性问题,类似于经典的双重检查锁定模式的问题。我不建议以这种方式进行欺骗,但在这里使用ReentrantReadWriteLock可能很有用。
您是对的 —— 读者没有义务必须检测到对 `unsynchronizedValue` 的更改。但即便如此,如果线程没有检测到更改,它会回退到读取 `value`(它是一个 `volatile`),这保证了它始终是最新的。在我们的测试中,大约40%的读取是在读取 `value` 之前检测到 `unsynchronizedValue` 的,从这个角度来看(以及基于其他内部性能测试),这种优化看起来是有价值的。在所有情况下,`deref()` 都返回了正确的结果。

“可重入的读写锁”(ReentrantReadWriteLock)是一个值得探索的有意思的建议 —— 感谢您!
我想了解原始有问题的代码是做什么的。例如,“对HTTP响应的后期处理”本身是否运行了更多的I/O?比如分发请求?
我们的虚拟线程的使用场景,正如您所怀疑的,是 I/O。(我说“是”因为对我们来说虚拟线程太不可靠了。它会造成令人难以调试的死锁,并消耗我们大约一个月的开发时间,包括所有时间。我们后来转而使用了一个自定义的 `future` 宏,在大型线程池之上。)我们使用 `http-kit` 进行 HTTP 请求分发,并对响应进行后期处理。我们需要所有这些操作 1) 不阻塞调用线程,2) 不耗尽 `clojure.core/future` 线程池。我们觉得,尽管我们可以(并且确实)使用一个解决方案,但虚拟线程可能是一个好选择,所以我们转向了它。

基本上,这里是我们所做的工作的代码
```clojure
;; 在一个库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/request request)]
    ;; 我们使用 `delay` 来保持它是一个可解引用输出,同时避免使用线程
    (delay (post-process @response))))

;; 在一个业务逻辑函数中
;; 将会在上游被 `deref`
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
        ;; 并发启动所有请求
        (mapv #(vthread @(request-and-post-process %)))
        ;; 聚合
        (mapv deref))))
```

请注意使用了 `clojure.core/delay`。结果证明,`clojure.lang.Delay` 在其 `deref` 方法中使用了 `synchronized`(请参阅 Clojure Q&A 线程),这会锁定虚拟线程。对 `delay` 进行 `deref` 会使调用虚拟线程等待 HTTP 请求和后续处理所需的时间,在某些情况下可能长达 30 秒。我们不仅看到了锁定行为,而且还出现了意外的死锁。%(锁定问题是 `clojure.lang.Delay` 的一个问题;根据我的了解,死锁问题是虚拟线程底层 JVM 实现的问题。我怀疑在 bugs.openjdk.org 上对此有一个尚未解决的问题。为了避免锁定,我们使用了基于自定义 `ReentrantLock` 的 `delay`。然而,在 JSON 反序列化库等其他地方还发生了锁定,这在 `delay` 优化后似乎仍然导致了死锁区域。)

如果这些能回答你的问题,请告诉我!我很乐意进一步解释。
关于看似在 JVM 中存在的一个死锁问题的bug,是否已由 JDK 21解决,或者这仅仅是 JDK 19 虚拟线程预览中的 bug?
...