在< ак style="color:#34495e;" href="https://www.surveymonkey.com/r/clojure2024">2024年 Clojure 状态调查问卷中分享您的想法!

欢迎!请参阅关于页面以了解有关此工作的更多信息。

+14
Java 互操作
已关闭

我在这里提问,因为我无法在https://clojure.atlassian.net/jira/software/c/projects/CLJ/issues/CLJ-2771上发帖。

我经营着一家初创公司,我们在使用 Java 19 虚拟线程时遇到了这个问题。

https://github.com/clojure/clojure/blob/4090f405466ea90bbaf3addbe41f0a6acb164dbb/src/jvm/clojure/lang/Delay.java#L35

调试这个问题需要大量的工程努力和理智,因为-Djdk.tracePinnedThreads报告没有任何信息。

目前,我们需要重新实现一个虚拟线程友好的(非-synchronized)clojure.core/delay,以及其他许多clojure.core函数,以确保虚拟线程不会被固定。

一如既往,我被Clojure团队的辛勤工作所折服!只是想强调一些API在虚拟线程上的不友好性,希望能优先处理,因为虚拟线程非常实用。

谢谢!

已关闭,备注:在1.12.0-alpha5中完成

2 个答案

+2
 
最佳答案

Clojure 1.12.0-alpha5 已发布,对 lazy-seq 和 delay 进行了修改,改为使用锁而不是 synchronized。

0 投票

谢谢,非常感兴趣为虚拟线程准备 Clojure。我想知道您的 delay 中有什么?

很高兴听到这个消息!最初是处理 HTTP 响应后(其本身是由 HttpKit 返回的 Promise)。为了消除一些变量,我移除了 HTTP 请求,添加了一个尚未实现的 "dummy" Promise,并将 `delay` 的体简单设置为 `(deref p <timeout> nil)`。 lowers of `<timeout>` ——1-10 ms ——几乎没有问题。更高的值会越来越频繁地导致由于 "虚拟线程饥饿"。

使用 `ReentrantLock` 自定义 `Delay` 实现覆盖 `clojure.lang.Delay` 解决了问题。乐于分享我们的 Java 类 —— 它在平台和虚拟线程上均通过了测试 https://github.com/clojure/clojure/blob/master/test/clojure/test_clojure/delays.clj。 (它还有一项优化,避免了在 `deref` 时引用 `volatile` 值字段,如果可以引用非同步的一个的话。)
虚拟线程中使用 synchronized 的主要缺点是围绕阻塞的同步,所以在围绕这一块使用 delayed 似乎可能是一个问题。我已经检查了核心中的所有同步,实际上只有少数几个看起来是问题(这是其中一个)。

"如果在可能的情况下引用非同步的一个,则避免在 `deref` 上引用 `volatile` 值字段"听起来像是 Java 中的经典并发错误(类似双重检查锁定),但可能会根据使用情况而正常。只是记住没有任何保证其他线程会看到共享非volatile、非同步状态的更改。
by
同意关于`synchronized`的说法。

关于可能存在的并发错误,观点很好。以下是相关代码。特别是`deref`使用了`volatile`/非同步策略

```
public class Delay implements IDeref, IPending {
  volatile Object value;
  Object unsynchronizedValue;
  ReentrantLock lock;

  private static class ExceptionalValue {
    private final Object v;

    public ExceptionalValue (Object v) {
      this.v = v;
    }
  }

  private static class PendingValue {
    private final Object v;

    public PendingValue (Object v) {
      this.v = v;
    }
  }

  public Delay(IFn f){
    Object v = new PendingValue(f);
    unsynchronizedValue = v;
    value = v;
    lock = new ReentrantLock();
  }

  private static Object getOrThrow(Object v) {
    if (v instanceof ExceptionalValue) {
      return Util.sneakyThrow((Throwable)(((ExceptionalValue)v).v));
    } else {
      return v;
    }
  }

  public Object deref() {
    if (unsynchronizedValue instanceof PendingValue) {
      // Volatile读
      Object v = value;

      if (v instanceof PendingValue) {
        try {
          lock.lock();

          // 加锁后进行Volatile读,以防更改
          Object vAfter = value;

          if (vAfter instanceof PendingValue) {
            IFn f = (IFn)(((PendingValue)vAfter).v);

            Object vComputed = null;
            try {
              vComputed = f.invoke();
            } catch (Throwable t) {
              vComputed = new ExceptionalValue(t);
            }

            unsynchronizedValue = vComputed;
            value = vComputed;
            return getOrThrow(vComputed);
          }
            return getOrThrow(vAfter);
        }
        finally {
          lock.unlock();
       }
      }
        return v;
  }
    } else {
  return getOrThrow(unsynchronizedValue);
    }
  }

  ... // `force`和`isRealized`在此实现
}
```
您在锁下写入unsynchronizedValue,但读取不在锁下,因此无法保证其他线程能够看到该写操作。这是一个线程可见性问题,类似于经典的双重检查锁定模式。我不会尝试以这种方式作弊,但在此处使用可重入读-写锁可能是有用的。
您是对的——不应该期望读者*必须*获取`unsynchronizedValue`的更改。话虽如此,如果一个线程未能获取更改,它将回退到读取`value`(它是`volatile`的),这将确保始终是最新的。在我们的测试中,大约40%的读取都是在读取`value`之前获取`unsynchronizedValue`的,因此在这方面(以及其他内部性能测试)优化似乎是有价值的。在所有情况下,`deref()`都返回了正确的结果。

`ReentrantReadWriteLock`是一个很有趣的探索建议——谢谢!
我想了解原始有问题的代码做了什么。是“HTTP响应后处理”本身运行了更多的IO?比如分派请求?
我们使用虚拟线程的场景,正如你所猜想的,是I/O。我说“曾是”是因为——我们这样使用时,虚拟线程太过不可靠。它们会导致死锁,调试起来非常痛苦,把我们大概一个月的开发时间都浪费了。我们后来转而使用在大型线程池之上的自定义`future`宏。)我们使用`http-kit`进行HTTP请求分发,并对响应进行后处理。我们需要在1)不阻塞调用线程的同时,2)不耗尽`clojure.core/future`线程池的情况下完成所有这些工作。我们觉得,虽然我们可以(也确实)使用一个解决方案来解决这个问题,但虚拟线程可能是一个很好的解决方案,所以我们转向使用它。

以下是我们的代码示例
```clojure
;; 在一个库函数中
(defn request-and-post-process [request]
  (let [response (http-kit/http-request request)]
    ;; 我们使用`delay`来保持输出的可解析性,同时避免使用线程
    (delay (post-process @response))))

;; 在一个业务逻辑函数中
;; 将在上游被`deref`
(defn async-concurrently-make-requests [requests]
  (vthread
    (->> requests
        ;; 并发启动所有请求
        (mapv #(vthread @(request-and-post-process %)))
        ;; 合并
        (mapv deref))))
```

注意,我们使用了`clojure.core/delay`。结果显示,`clojure.lang.Delay`在其`deref`方法中使用`synchronized`,这会锁定虚拟线程。解析`delay`将使得调用线程等待HTTP请求和后处理运行的时间,在某些情况下可能长达30秒。我们在这一行为中不仅看到了锁定,还看到了死锁,这是出乎意料的。(锁定问题是`clojure.lang.Delay`的问题;死锁问题可能是底层JVM对虚拟线程的实现问题,据我所知。我认为在bugs.openjdk.org上对此有一个开放的工单。)为了避免锁定,我们使用了基于`ReentrantLock`的`delay`。然而,例如在JSON反序列化库中,仍然存在其他锁定问题,尽管我们增强了`delay`,但似乎仍然让我们陷入死锁状态。

请告诉我,这一切是否回答了你的问题!我很乐意进一步阐述。
by
关于看起来是JVM中一个错误的死锁问题,这已经在JDK 21中解决了吗?还是这只是JDK 19虚拟线程预览版中存在的错误?
...